-
Notifications
You must be signed in to change notification settings - Fork 339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CDAP-16709 batch spark auto-join implementation #12201
CDAP-16709 batch spark auto-join implementation #12201
Conversation
cb8e631
to
57d102b
Compare
@@ -89,7 +90,7 @@ public PipelinePhasePreparer(PluginContext pluginContext, Metrics metrics, Macro | |||
boolean isConnectorSink = | |||
Constants.Connector.PLUGIN_TYPE.equals(pluginType) && phase.getSinks().contains(stageName); | |||
|
|||
SubmitterPlugin submitterPlugin; | |||
SubmitterPlugin submitterPlugin = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the submitterPlugin
can be null
after the following if
s checks? Is it being handled already? I don't think the logic can this value being null
. It is probably better to have a else
that throw if the joiner plugin is not of one of the supported classes.
|
||
// join required stages first to cut down the data as much as possible | ||
List<JoinStage> requiredStages = joinDefinition.getStages().stream() | ||
.filter(s -> s.isRequired()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use method reference .filter(JoinStage::isRequired)
.collect(Collectors.toList()); | ||
List<JoinStage> orderedStages = new ArrayList<>(requiredStages.size() + optionalStages.size()); | ||
orderedStages.addAll(requiredStages); | ||
orderedStages.addAll(optionalStages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This list can be obtained by sorting the list directly.
List<JoinStage> orderedStages = new ArrayList<>(joinDefinition.getStages())
orderedStages.sort((s1, s2) -> s1.isRequired() ? (s2.isRequired() ? 0 : -1) : s2.isRequired ? 1 : 0));
} | ||
JoinCondition.OnKeys onKeys = (JoinCondition.OnKeys) condition; | ||
Map<String, List<String>> stageKeys = onKeys.getKeys().stream() | ||
.collect(Collectors.toMap(j -> j.getStageName(), j -> j.getFields())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use method reference instead of lambda toMap(JoinKey::getStageName, JoinKey::getFields)
JoinCondition.OnKeys onKeys = (JoinCondition.OnKeys) condition; | ||
Map<String, List<String>> stageKeys = onKeys.getKeys().stream() | ||
.collect(Collectors.toMap(j -> j.getStageName(), j -> j.getFields())); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - extra line
@@ -378,6 +356,123 @@ public void runPipeline(PipelinePhase pipelinePhase, String sourcePluginType, | |||
} | |||
} | |||
|
|||
private SparkCollection<Object> handleAutoJoin(AutoJoiner autoJoiner, AutoJoinerContext autoJoinerContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method and the handleJoin
method is non-trivial logic, it is better to add some javadoc about the logic
List<JoinStage> optionalStages = joinDefinition.getStages().stream() | ||
.filter(s -> !s.isRequired()) | ||
.collect(Collectors.toList()); | ||
List<JoinStage> orderedStages = new ArrayList<>(requiredStages.size() + optionalStages.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain a bit why we want an ordered stages? From the logic below, we are basically going through the list one by one, and then determine the join type based on left is required or right is required. Why do we want to loop through required stages first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a comment above but I can expand on it. If there is an outer join and an inner join, it is more performant to do the inner join first because it will generate less intermediate data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
though now that I think more about it, it may be better not to do this, and just follow the order provided by the plugin. Can add this back in if it helps once we experiment a bit more on uneven joins.
private final SparkCollection<?> data; | ||
private final Schema schema; | ||
private final List<String> key; | ||
private final boolean broadcast; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used anywhere? The getter doesn't seem to get called. Also it is good to add some comment on what this means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not currently, was going to implement it later.
* | ||
* @param <T> type of object in the collection | ||
*/ | ||
public class RDDCollection<T> extends BaseRDDCollection<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it is good to add some comment on why we need different implementations for spark 1 and 2. Sometimes it takes some time for me to understand the difference. Some javadoc will help understand the compatibility or implementation difference between spark 1 and 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor comment, LGTM
import org.apache.spark.storage.StorageLevel; | ||
import scala.Tuple2; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
|
||
/** | ||
* Implementation of {@link SparkCollection} that is backed by a JavaRDD. | ||
* Implementation of {@link SparkCollection} that is backed by a JavaRDD. Spark1 and Spark2 implementations need to be | ||
* separate because DataFrames are not compatible between Spark1 and Spark2. In Spark2, DataFrame is a just a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is a just
-> is just
b7f16f4
to
e31279c
Compare
Implemented auto join for batch spark pipelines. Added a join method to SparkCollection that takes in the list of other SparkCollections that it should be joined to. RDDCollection converts RDDs into Datasets and uses the Dataset join method to implement the join. This allows Spark to broadcast small datasets automatically, and to use sort merge join instead of shuffle hash join, which has better memory characteristics. As part of this, added a separate RDDCollection implementation for Spark1 and Spark2, since the Spark API for joins is not compatible.
e31279c
to
d50f0ea
Compare
Implemented auto join for batch spark pipelines.
Added a join method to SparkCollection that takes in the list of
other SparkCollections that it should be joined to.
RDDCollection converts RDDs into Datasets and uses the Dataset
join method to implement the join. This allows Spark to broadcast
small datasets automatically, and to use sort merge join instead
of shuffle hash join, which has better memory characteristics.
As part of this, added a separate RDDCollection implementation for
Spark1 and Spark2, since the Spark API for joins is not compatible.