-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2049] Remove KeyedCombineFn #2636
Conversation
b04586d
to
b076a0f
Compare
349ccb8
to
07f69b0
Compare
R: @tgroh (also any more eyes are helpful) I'll be running all sorts of test suites offline until Jenkins is back in working order. Most are pretty easy and quick. |
Confirmed the following offline, with some unrelated metrics flakes.
|
CombineFnUtil.toFnWithContext(Sum.ofIntegers().<String>asKeyedFn()); | ||
String key = "key"; | ||
accum = keyedFnWithContext.createAccumulator(key, nullContext); | ||
CombineFnWithContext<Integer, int[], Integer> keyedFnWithContext = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not keyed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -73,8 +72,8 @@ public void testToFnWithContextIdempotent() throws Exception { | |||
CombineFnUtil.toFnWithContext(Sum.ofIntegers()); | |||
assertTrue(fnWithContext == CombineFnUtil.toFnWithContext(fnWithContext)); | |||
|
|||
KeyedCombineFnWithContext<Object, Integer, int[], Integer> keyedFnWithContext = | |||
CombineFnUtil.toFnWithContext(Sum.ofIntegers().asKeyedFn()); | |||
CombineFnWithContext<Integer, int[], Integer> keyedFnWithContext = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not keyed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -40,14 +40,14 @@ | |||
|
|||
|
|||
/** | |||
* A {@link org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn} | |||
* A {@link org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn} | |||
* with a {@link org.apache.beam.sdk.transforms.CombineWithContext.Context} for the SparkRunner. | |||
*/ | |||
public class SparkKeyedCombineFn<K, InputT, AccumT, OutputT> extends SparkAbstractCombineFn { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be renamed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact the code paths are different enough that we should address this separately. It is easy, but not "just a rename", mostly due to keeping the WindowedValue wrappers in place everywhere. It is definitely a runner implementation detail that it chooses to specialize.
|
||
/** | ||
* Static utility methods that provide {@link GlobalCombineFnRunner} implementations for different | ||
* keyed combine functions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not keyed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final FlinkBroadcastStateInternals<K> flinkStateInternals; | ||
|
||
FlinkKeyedCombiningState( | ||
DefaultOperatorStateBackend flinkStateBackend, | ||
StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address, | ||
Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn, | ||
Combine.CombineFn<InputT, AccumT, OutputT> combineFn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto here FWIW. Leaving runner-specific constructs in place.
OK. I hit a few more straggling names and tests. |
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM if tests pass.
a168c88
to
a33eb56
Compare
15cd836
to
e629d49
Compare
retest this please |
run dataflow validatesrunner |
run dataflow validatesrunner |
1717916
to
07ca542
Compare
Dataflow ValidatesRunner is stalling reliably at HEAD; nothing to do with this PR. It may be infrastructure or pom or another issue. Certainly it is urgent, but I don't think this PR should wait on it. We have lots of assurances:
Given the "LGTM if tests pass" I am going to merge this. Incidentally, all of this testing I've done against the merger of HEAD and this PR also gives some assurance that our code at HEAD is OK. |
Update Dataflow worker version to beam-master-20170430 Remove KeyedCombineFn
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull request
mvn clean verify
. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>
in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.
I think it is probably best to leave tweaking
StateTag
andStateSpec
until a later PR...