New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1432] Inject number of Shards as a Side Input to Write #1941
Conversation
This permits users to pass a PTransform from the input elements to the number of shards instead of requiring a constant amount. This enables sharding to be determined based on the input data rather than a constant value. Tests still should be written. This can also convert the DirectRunner to inject a custom sharding strategy in its override, rather than relying on custom bundling.
Tests still need to be written. @dhalperi @reuvenlax you two are likely interested. |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be easy to extend to support ValueProvider for numShards.
return numShards; | ||
.addIfNotNull( | ||
DisplayData.item("sharding", getSharding() == null ? null : getSharding().getClass())) | ||
.include("sharding", getSharding()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent seems off, add a label.
Doesn't .include
do L122 already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaned this up. include does indeed do that, but also doesn't permit null (and getSharding can of course be null)
this.sink = sink; | ||
this.numShards = numShards; | ||
this.computeNumShards = computeNumShards; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent off?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -96,11 +95,11 @@ | |||
*/ | |||
public static class Bound<T> extends PTransform<PCollection<T>, PDone> { | |||
private final Sink<T> sink; | |||
private int numShards; | |||
private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Nullable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -140,6 +130,10 @@ public int getNumShards() { | |||
return sink; | |||
} | |||
|
|||
public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Nullable
, javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
@Override | ||
public void populateDisplayData(DisplayData.Builder builder) { | ||
builder.add(DisplayData.item("numShards", numShards)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with label
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
@@ -254,14 +254,11 @@ public void testWriteWithSessions() { | |||
public void testBuildWrite() { | |||
Sink<String> sink = new TestSink() {}; | |||
Write.Bound<String> write = Write.to(sink).withNumShards(3); | |||
assertEquals(3, write.getNumShards()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* Returns a new {@link Write.Bound} that will write to the current {@link Sink} with | ||
* runner-determined sharding. | ||
*/ | ||
public Bound<T> withoutSharding() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withDynamicSharding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withRunnerDeterminedSharding
is more precise (as DynamicSharding can be user-determined)
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
cdc1840
to
41071dd
Compare
Refer to this link for build results (access rights to CI server needed): |
R: @dhalperi I have also added the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
essentially LGTM, minor comments.
If you're happy with the validation improvements, go ahead and self-merge. If you want me to TAL, let me know.
@@ -151,7 +160,42 @@ public int getNumShards() { | |||
* runner-controlled sharding. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consistent language? (runner-determined/controlled)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like determined. Done.
if (numShards == null) { | ||
minShardsNeeded = 1; | ||
} else { | ||
minShardsNeeded = c.sideInput(numShards); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validate this value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
@Override | ||
public void populateDisplayData(DisplayData.Builder builder) { | ||
builder.add(DisplayData.item("numShards", numShards).withLabel("ConstantShards")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Fixed Number of Shards" to match the existing label?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
public Integer apply(T input) { | ||
@ProcessElement | ||
public void processElement(ProcessContext context) { | ||
Integer shardCount = context.sideInput(numShards); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validate this number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Refer to this link for build results (access rights to CI server needed): Failed Tests: 0--none-- |
(please self-merge at will) |
Refer to this link for build results (access rights to CI server needed): |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.
This permits users to pass a PTransform from the input elements to the
number of shards instead of requiring a constant amount. This enables
sharding to be determined based on the input data rather than a constant
value.
This can also convert the DirectRunner to inject a custom sharding
strategy in its override, rather than relying on bundling behavior.