New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1074] Set default-partitioner in SourceRDD.Unbounded #2288
[BEAM-1074] Set default-partitioner in SourceRDD.Unbounded #2288
Conversation
Run Spark RunnableOnService |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
R: @amitsela |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My only comments are, well, on comments 😄
The source implementations are tricky and rely on some members to be persisted in checkpoint so it is important to note this.
Besides that, LGTM.
Feel free to merge after taking care of the notes in the code.
@@ -60,47 +59,64 @@ | |||
private final UnboundedSource<T, CheckpointMarkT> unboundedSource; | |||
private final SparkRuntimeContext runtimeContext; | |||
private final Duration boundReadDuration; | |||
private final int numPartitions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a member of the DStream it is initialized once, and would be recovered on checkpoint recovery, right ?
So it should be noted here that this is a "one time" set of partitions/splits, and cannot change throughout the entire life of the application (+ recovery/resume).
this.boundMaxRecords = boundMaxRecords > 0 ? boundMaxRecords : rateControlledMaxRecords(); | ||
|
||
try { | ||
this.numPartitions = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit more notes here too - it's an init. of splits to figure out source parallelism...
@@ -112,6 +128,10 @@ public String name() { | |||
return "Beam UnboundedSource [" + id() + "]"; | |||
} | |||
|
|||
public int getNumPartitions() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Notes: "exposing the number of partitions for this SourceDStream so we can set the appropriate partitioner on the read via mapWithState
..." or something.
@@ -247,6 +253,13 @@ public Unbounded(SparkContext sc, | |||
} | |||
|
|||
@Override | |||
public Option<Partitioner> partitioner() { | |||
// setting the partitioner helps to "keep" the same partitioner in the following |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
JavaDStream<WindowedValue<T>> readUnboundedStream = mapWithStateDStream.flatMap( | ||
new FlatMapFunction<Tuple2<Iterable<byte[]>, Metadata>, byte[]>() { | ||
@Override | ||
public Iterable<byte[]> call(Tuple2<Iterable<byte[]>, Metadata> t2) throws Exception { | ||
return t2._1(); | ||
} | ||
}).map(CoderHelpers.fromByteFunction(coder)); | ||
|
||
if (sourceDStream.getNumPartitions() < defaultParallelism) { | ||
// Repartition up to default parallelism if there are too few partitions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimizing parallelism instead of Repartitioning-up ?
581233f
to
0fa4d29
Compare
Moved BEAM-1075 commit to a different branch for now, we'll sit on it and see when and if to integrate it. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.