Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3708] Adding grouping table to Precombine step. #5795

Merged
merged 2 commits into from Jul 2, 2018

Conversation

youngoli
Copy link
Contributor

Adding a grouping table to the Precombine step of a lifted Combine Per
Key. This enables performing a Partial Group by Key optimization. The
grouping table code is somewhat generic, so it can be reused in other
runners that want to perform a Partial Group by Key.

Note for any reviewers:
I wasn't entirely sure where to commit the GroupingTable code, since it's somewhat generic, so I'm starting with the most specific directory it would fit in, but I may move the GroupingTable files to a new sub-directory named "utils" or something similar, or a completely different directory if anyone has any suggestions.


Follow this checklist to help us incorporate your contribution quickly and easily:

  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Spark
Go Build Status --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
--- --- ---

@youngoli
Copy link
Contributor Author

R: @lukecwik

accumCoder);

// Register the appropriate handlers.
addStartFunction.accept(runner::startBundle);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You add the start function twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

return runner;
}
}

static <KeyT, InputT, AccumT>
ThrowingFunction<KV<KeyT, InputT>, KV<KeyT, AccumT>> createPrecombineMapFunction(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is no longer used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Adding a grouping table to the Precombine step of a lifted Combine Per
Key. This enables performing a Partial Group by Key optimization. The
grouping table code is somewhat generic, so it can be reused in other
runners that want to perform a Partial Group by Key.
Simplifying the grouping table code by making it more specific to the
precombine. The class doesn't need to be so generic when only the
Precombine runner is going to use it in the foreseeable future.
@youngoli
Copy link
Contributor Author

Run Java PreCommit

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the comments are improvements to the code. I'll merge as-is and let you work on the next iteration improving the implementation based upon the comments I left in this PR.

// Input coder may sometimes be WindowedValueCoder depending on runner, instead of the
// expected KvCoder.
Coder<?> uncastInputCoder = rehydratedComponents.getCoder(mainInput.getCoderId());
KvCoder<KeyT, InputT> inputCoder;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't use the inputCoder anywhere except to get the key coder.

Consider dropping the local variable inputCoder and setting keyCoder directly.


void processElement(WindowedValue<KV<KeyT, InputT>> elem) throws Exception {
groupingTable.put(
elem, (Object outputElem) -> output.accept((WindowedValue<KV<KeyT, AccumT>>) outputElem));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you use a cast, you should be able to pass this in as a method reference instead of using a lambda


void finishBundle() throws Exception {
groupingTable.flush(
(Object outputElem) -> output.accept((WindowedValue<KV<KeyT, AccumT>>) outputElem));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here, if you use a cast, you should be able to pass this in as a method reference instead of using a lambda

public interface GroupingTable<K, InputT, AccumT> {

/** Abstract interface of things that accept inputs one at a time via process(). */
interface Receiver {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'll need to make this generic in this sense. Consider using FnDataReceiver directly here instead of Receiver.

}

/** Adds a pair to this table, possibly flushing some entries to output if the table is full. */
void put(Object pair, Receiver receiver) throws Exception;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use KV<InputT, AccumT>

}

/** Provides client-specific operations for combining values. */
public interface Combiner<K, InputT, AccumT, OutputT> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll only have one implementation of a Combiner. You should be able to use the CombineFn directly everywhere.


if (size >= maxSize) {
long targetSize = (long) (TARGET_LOAD * maxSize);
Iterator<GroupingTableEntry<K, InputT, AccumT>> entries = table.values().iterator();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would do a lot better if we used an LRU strategy for cache eviction.

private long size = 0;
private Map<Object, GroupingTableEntry<K, InputT, AccumT>> table;

PrecombineGroupingTable(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this is a copy from Dataflow internally but could be replaced with a much better implementation such as caffeine.

}

@VisibleForTesting
public void setMaxSize(long maxSize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be much better if this was a final value and configurable via construction only.

@@ -93,6 +93,7 @@ public Integer extractOutput(Integer accum) {
private RunnerApi.PTransform pTransform;
private String inputPCollectionId;
private String outputPCollectionId;
private RunnerApi.Pipeline pProto;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: pProto -> pipeline or pipelineProto

@lukecwik
Copy link
Member

lukecwik commented Jul 2, 2018

Run Java PreCommit

@lukecwik lukecwik merged commit 6e3c055 into apache:master Jul 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants