[BEAM-3079] Update samza-runner with more features and improvements#5517
[BEAM-3079] Update samza-runner with more features and improvements#5517kennknowles merged 2 commits intoapache:samza-runnerfrom
Conversation
|
The reason all three precommits failed is |
|
Fixed the headers. Thanks! |
|
@kennknowles : could you please help review it when you get a chance? Thanks! |
|
Yes, sorry for the delay! Reviewing today. |
|
There's a merge commit I see from |
|
Sorry about it. Let me take a look. |
dac17b7 to
c67fc4a
Compare
|
@kennknowles : I squashed all my commits into one so the changes are separated from the upstream merge. I think UsesImpulses tests are added in the master and Samza doesn't support it right now. |
|
hmm, seems the headers are missing again in this patch. Let me quickly add them. |
kennknowles
left a comment
There was a problem hiding this comment.
It is a lot to review but it pretty much LGTM. My comments are not worth blocking things on.
| final TimerInternals timerInternals; | ||
| final StateInternals stateInternals; | ||
|
|
||
| if (signature.usesState()) { |
There was a problem hiding this comment.
If it doesn't use state, you don't need to create a DoFnRunnerWithKeyedInternals, do you? I think it would clean this up to refactor the constructor without the conditionals here and below.
| final DoFnRunner<InputT, OutputT> doFnRunnerWithMetrics = DoFnRunnerWithMetrics | ||
| .wrap(doFnRunner, metricsContainer, stepName); | ||
|
|
||
| if (keyedInternals != null) { |
There was a problem hiding this comment.
This conditional goes right along with the one above to make just two code paths: the one where this method actually does something, and the one that is basically pass through.
| outputManager, | ||
| mainOutputTag, | ||
| additionalOutputTags, | ||
| createStepContext(stateInternals, timerInternals), |
There was a problem hiding this comment.
Ah, is this why it is organized this way? OK.
Add the following feature support:
Improvements: