Skip to content

Commit

Permalink
Merge pull request #1221 from RumbleDB/FixJoinProjections
Browse files Browse the repository at this point in the history
Fix bug with join projections.
  • Loading branch information
ghislainfourny committed Mar 27, 2023
2 parents 06875c7 + dd3d514 commit f754387
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
11 changes: 11 additions & 0 deletions src/main/java/org/rumbledb/context/DynamicContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.rumbledb.items.structured.JSoundDataFrame;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -158,6 +159,16 @@ public static void mergeVariableDependencies(
}
}

public static Map<Name, DynamicContext.VariableDependency> copyVariableDependencies(
Map<Name, DynamicContext.VariableDependency> from
) {
Map<Name, DynamicContext.VariableDependency> result = new HashMap<>();
for (Name v : from.keySet()) {
result.put(v, from.get(v));
}
return result;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,14 @@ public FlworDataFrame getDataFrameAsJoin(
StructType inputSchema = inputDF.schema();
List<Name> variableNamesToExclude = new ArrayList<>();
variableNamesToExclude.add(this.variableName);
inputSchema.printTreeString();
Map<Name, VariableDependency> prefilterProjection = DynamicContext.copyVariableDependencies(parentProjection);
DynamicContext.mergeVariableDependencies(prefilterProjection, predicateDependencies);
prefilterProjection.put(this.variableName, prefilterProjection.get(Name.CONTEXT_ITEM));
prefilterProjection.remove(Name.CONTEXT_ITEM);
List<FlworDataFrameColumn> columnsToSelect = FlworDataFrameUtils.getColumns(
inputSchema,
parentProjection,
prefilterProjection,
null,
variableNamesToExclude
);
Expand All @@ -434,7 +439,6 @@ public FlworDataFrame getDataFrameAsJoin(
SparkSessionManager.leftHandSideHashColumnName
)
);

// We now post-filter on the predicate, by hash group.
RuntimeIterator filteringPredicateIterator = new PredicateIterator(
new VariableReferenceIterator(
Expand Down

0 comments on commit f754387

Please sign in to comment.