Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Many simplifications to WriteFiles #4145

Closed
wants to merge 12 commits into from
Closed

Conversation

jkff
Copy link
Contributor

@jkff jkff commented Nov 17, 2017

Commits explain what's going on. I recommend reviewing each commit individually. Most importantly, this unifies windowed and unwindowed finalize (this is the only meaningful change - everything else is just restructuring), and refactors the transform into sub-transforms for better readability.

Many more simplifications are possible in WriteOperation/FileBasedSink themselves, but I'll defer that to post-FileIO #3817 (this PR can be reviewed in parallel with that one)

R: @reuvenlax or feel free to reassign to anybody else.

@asfgit
Copy link

asfgit commented Nov 17, 2017

SUCCESS

--none--

@asfgit
Copy link

asfgit commented Nov 17, 2017

FAILURE

--none--

@asfgit
Copy link

asfgit commented Nov 17, 2017

SUCCESS

--none--

@jkff
Copy link
Contributor Author

jkff commented Nov 20, 2017

@chamikaramj agreed to take a look.

@reuvenlax
Copy link
Contributor

reuvenlax commented Nov 21, 2017 via email

? new FileResult<>(
writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination)
: new FileResult<>(
writer.getOutputFile(), UNKNOWN_SHARDNUM, null, null, key.destination);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we lose the shard number, where before we had the shard number in the output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean here: this is the unsharded WriteBundles, which emits normally-written bundles with UNKNOWN_SHARDNUM and emits spilled data with a shard number (that is later discarded). This was the case before this change, and is still the case after this change.

shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
? c.element().getKey().getShardNumber()
: UNKNOWN_SHARDNUM;
if (windowedWrites) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we simplified the FileBasedSink API at the expense of extra complexity in multiple places in WriteFiles. Is this really a simplification?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an intermediate step to more simplifications that will come after FileIO - in this case, I removed result from Writer in order to simplify removing DestinationT from Writer (and later also removing UserT), to end up with having Writer be a bare-bones consumer for format-specific records that are directly written into it, e.g. for text files it is strings, without any user or destination types involved.

I think this is also desirable from a readability point of view: shard number, destination etc. are bookkeeping information private to WriteFiles, and it's better if they are managed by WriteFiles rather than scattered across two classes.

@@ -854,7 +854,8 @@ public void processElement(ProcessContext c) {
} else if (numShardsProvider != null) {
fixedNumShards = numShardsProvider.get();
} else {
fixedNumShards = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enforcing something like this at runtime is unfortunate. Any way to enforce at graph construction time?

TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittedRecordsTag =
new TupleTag<>("unwrittenRecordsTag");
TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag =
new TupleTag<>("spilledRecordsTag");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changing the TupleTag name breaks update compatibility. Update compatibility is the whole reason we used an explicit name here in the first place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add comments to here and other places so that update compatibility is not accidentally broken by future updates.

// number assigned at all. Drop the shard number on the spilled records so that
// shard numbers are assigned together to both the spilled and non-spilled files in
// finalize.
.apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, we can't change the transform name, it is meant to be stable.

.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
"WriteSpilled", ParDo.of(new WriteShardedBundles()).withSideInputs(sideInputs))
.setCoder(fileResultCoder)
.apply("DropShardNum", ParDo.of(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why drop shard numbers here?

ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE))
.withSideInputs(sideInputs))
.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
"WriteSpilled", ParDo.of(new WriteShardedBundles()).withSideInputs(sideInputs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you're now going to assign shard #s based on the key? This is potentially broken for batch, as some of the files (the ones written bye WriteWindowedBundles) are assigned in finalize, but the spilled files are assigned here. I worry that could lead to collisions.

Copy link
Contributor Author

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Github isn't letting me respond inline:

  • I reverted the rename of tuple tags (spilled -> unwritten).
  • Regarding shard number assignment for spilled records: in the old code, it was already the case that written records have UNKNOWN_SHARDNUM and unwritten (spilled) have a shard number assigned according to spill factor, and then in finalize with ASSIGN_IN_FINALIZE both of these shard numbers are actually discarded and recomputed. I wanted the finalize operation to not receive a mix of known and unknown shard numbers, but we still need to shard the spilled records - so I moved the logic of discarding shard numbers into an explicit DoFn DropShardNum.

For this comment:

Enforcing something like this at runtime is unfortunate. Any way to enforce at graph construction time?
Hide outdated

the original context is lost. Is it still relevant?

? new FileResult<>(
writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination)
: new FileResult<>(
writer.getOutputFile(), UNKNOWN_SHARDNUM, null, null, key.destination);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean here: this is the unsharded WriteBundles, which emits normally-written bundles with UNKNOWN_SHARDNUM and emits spilled data with a shard number (that is later discarded). This was the case before this change, and is still the case after this change.

shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
? c.element().getKey().getShardNumber()
: UNKNOWN_SHARDNUM;
if (windowedWrites) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an intermediate step to more simplifications that will come after FileIO - in this case, I removed result from Writer in order to simplify removing DestinationT from Writer (and later also removing UserT), to end up with having Writer be a bare-bones consumer for format-specific records that are directly written into it, e.g. for text files it is strings, without any user or destination types involved.

I think this is also desirable from a readability point of view: shard number, destination etc. are bookkeeping information private to WriteFiles, and it's better if they are managed by WriteFiles rather than scattered across two classes.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@@ -868,53 +934,10 @@ protected void finishWrite() throws Exception {}
* id populated for the case of static sharding. In cases where the runner is dynamically
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top part of this comment ("Performs bundle initialization. For example..") seems to be more generic than it should be. This is probably from Sink.java days. Shall we simplify/update this (and possibly other) doc comments here ?

writer.getOutputFile(),
shard,
GlobalWindow.INSTANCE,
PaneInfo.ON_TIME_AND_ONLY_FIRING,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we changing the default values chosen for PainInfo here (could not easily find out by following the code) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously the code was structured differently, and the values passed in this particular codepath ended up being ignored. I consolidated things somewhat to handle much of windowed and unwindowed case the same way, and made the requirements more strict, in particular that window and pane have to be always set.

c.output(
new FileResult<>(
writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey()));
int shard = c.element().getKey().getShardNumber();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validate ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

final PCollectionView<Integer> numShardsView;
PCollectionView<Integer> numShardsView =
(computeNumShards == null) ? null : input.apply(computeNumShards);
List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(numShardsView == null) for readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittedRecordsTag =
new TupleTag<>("unwrittenRecordsTag");
TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag =
new TupleTag<>("spilledRecordsTag");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add comments to here and other places so that update compatibility is not accidentally broken by future updates.

LOG.info("No output files to write.");
LOG.debug("Copying {} files.", numFiles);
List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size());
List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert that list sizes are equal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems overkill - they are created right next to each other in code, and FileSystems.copy() already does that verification. I removed the size hints to make it a little simpler (preallocation probably doesn't matter here).

"When finalizing a windowed write, should have set fixed sharding");
}
fixedNumShards = getFixedNumShards.apply(c);
checkState(fixedNumShards != null, "Windowed write should have set fixed sharding");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Windowed (non triggered) writes in batch do not need fixed sharding

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #4124 they do - see #4137 for explanation.

PCollection<FileResult<DestinationT>> tempFileResults =
(computeNumShards == null && numShardsProvider == null)
? input.apply(
"WriteUnshardedBundlesToTempFiles",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, refactoring into new PTransforms changes the name of every single sub step (since step names are hierarchical).

Copy link
Contributor Author

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, addressed all comments; PTAL.

I replied inline where I could. To address Reuven's comment about refactoring into new PTransforms: per offline discussion, this can be handled by publishing instructions for applying a transform mapping in release notes; I would not like this issue to block simplifications of this code, whose complexity has unfortunately already bitten us many times in more painful ways than update incompatibility.

LOG.info("No output files to write.");
LOG.debug("Copying {} files.", numFiles);
List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size());
List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems overkill - they are created right next to each other in code, and FileSystems.copy() already does that verification. I removed the size hints to make it a little simpler (preallocation probably doesn't matter here).

final PCollectionView<Integer> numShardsView;
PCollectionView<Integer> numShardsView =
(computeNumShards == null) ? null : input.apply(computeNumShards);
List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"When finalizing a windowed write, should have set fixed sharding");
}
fixedNumShards = getFixedNumShards.apply(c);
checkState(fixedNumShards != null, "Windowed write should have set fixed sharding");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #4124 they do - see #4137 for explanation.

c.output(
new FileResult<>(
writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey()));
int shard = c.element().getKey().getShardNumber();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

writer.getOutputFile(),
shard,
GlobalWindow.INSTANCE,
PaneInfo.ON_TIME_AND_ONLY_FIRING,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously the code was structured differently, and the values passed in this particular codepath ended up being ignored. I consolidated things somewhat to handle much of windowed and unwindowed case the same way, and made the requirements more strict, in particular that window and pane have to be always set.

writer = writeOperation.createWriter();
int shardNumber =
shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jkff wrote:
After #4124 they do - see #4137 for explanation.

We went through some trouble to ensure that windowed writes in batch do not require fixed sharding. 4124 fixes a bug that only affects triggered (generally streaming) writes, so it's unfortunate if it makes the batch case less usable. Is there any way to make windowed writes work in batch without requiring fixed sharding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #4137 I argued that this was never guaranteed to work. Or more like, it would only work if

.apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult<DestinationT>>create())
fires exactly once and deterministically contains all the data - the code used "collection is bounded" as a proxy for that, but in reality this is not sufficient: Beam model does not prevent having a bounded collection triggering multiple times.

However, it seems that the only reason it doesn't work is that the order of things retrieved from a GBK is unstable - well, let's stabilize it then, with another reshuffle! I did that, and now this is supported - thanks for pushing :) Done.

@jkff
Copy link
Contributor Author

jkff commented Dec 1, 2017

Run Java PostCommit

@jkff
Copy link
Contributor Author

jkff commented Dec 4, 2017

Run Java PostCommit

@jkff
Copy link
Contributor Author

jkff commented Dec 5, 2017

PTAL?

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks.

@asfgit asfgit closed this in 761ec1a Dec 6, 2017
@jkff jkff deleted the simplify-write-files branch December 6, 2017 00:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants