Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/cdap 16709 pipeline performance cp #12234

Merged
merged 11 commits into from Jun 1, 2020

Conversation

albertshau
Copy link
Contributor

No description provided.

albertshau and others added 11 commits June 1, 2020 09:04
Add a new set of classes for AutoJoiner, which can be used instead
of the current Joiner interface. This new API leaves all of the
implementation details up to the application, which will allow the
app to perform the join in better ways. For example, in the Spark
program, it will allow using broadcast joins.

Plugin developers are responsible for returning a JoinDefinition
based on information about incoming stages.

This change includes the JoinDefinition as well as all the classes
required to create a definition. It also includes validation logic
to make sure the plugin cannot create a definition that tries to
join on a field that doesn't exist, or tries to join on fields
that have mismatched types, or any other type of error.
Implemented auto join for batch spark pipelines.

Added a join method to SparkCollection that takes in the list of
other SparkCollections that it should be joined to.
RDDCollection converts RDDs into Datasets and uses the Dataset
join method to implement the join. This allows Spark to broadcast
small datasets automatically, and to use sort merge join instead
of shuffle hash join, which has better memory characteristics.

As part of this, added a separate RDDCollection implementation for
Spark1 and Spark2, since the Spark API for joins is not compatible.
Implemented AutoJoiner for mapreduce by creating a bridge that
implements the old Joiner API using the JoinDefinition from the
new AutoJoiner API.
Changed the 'dropNullKeys' property to 'nullSafe' since it more
accurately describes what is happening. Null keys are not dropped
in outer joins, they just do not count as being equal to a null
key on the other side.

Implemented by using Spark's null safe equality when configured
to do so and normal equality otherwise.
Honor the broadcast flag set in the JoinDefinition when joining
multiple DataFrames. Added a small tweak to the join logic to
join all non-broadcasted datasets first in order to ensure that
both sides of the join are not broadcast, and to reduce the amount
of data that is being shuffled in non-broadcast intermediate joins.
Implemented the nullSafe flag for mapreduce auto join.
This was done by filtering out records on the map side if they
come from an optional stage and have a null key or a field in
the key that is null.
Implemented auto-join for spark streaming by using the same
JoinerBridge that is used for MapReduce. This means auto-joins
in streaming pipelines will have the same characteristics as normal
joins, meaning they will be executed as shuffle hash joins.

This is probably ok, as only data within the micro batch is being
joined, which means it shouldn't be too likely to go OOM assuming
there is enough executor memory.
@albertshau
Copy link
Contributor Author

cherry-pick of #12230, #12228, #12227, #12220, #12215, #12213, #12210, #12204, #12203, #12201, #12185

@albertshau
Copy link
Contributor Author

@albertshau albertshau merged commit 3380f11 into develop Jun 1, 2020
@albertshau albertshau deleted the feature/CDAP-16709-pipeline-performance-cp branch June 1, 2020 18:33
@albertshau albertshau added the 6.3 label Aug 21, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants