-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-3905] Update Flink Runner to Flink 1.5.0 #5578
Conversation
Run Flink ValidatesRunner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really minor nitpicks, almost LGTM. Probably good to have a second round by @tweise in case I miss some Flink specific detail.
|
||
if (keySelector != null) { | ||
ListStateDescriptor<WindowedValue<InputT>> pushedBackStateDescriptor = | ||
new ListStateDescriptor<>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move above of if/else given that is the same in both conditions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixing
Stream<WindowedValue<InputT>> pushedBack = pushedBackElementsHandler.getElements(); | ||
Iterator<WindowedValue<InputT>> it = pushedBack.iterator(); | ||
|
||
long min = Long.MAX_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move out of the if condition to avoid the else, and then you do the setPushedBackWatermark
on min
for all cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I can because I'm iterating over the pushed back elements, which I only have in the case where I have side inputs.
if (keyedStateInternals == null) { | ||
throw new RuntimeException("Current watermark is still " + currentOutputWatermark + "."); | ||
|
||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let just the if and this is the default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean flip the if?
Iterator<WindowedValue<InputT>> it = pushedBack.iterator(); | ||
|
||
long min = Long.MAX_VALUE; | ||
while (it.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe 'simpler' to do min = pushedBack.map(v -> v.getTimestamp().getMillis()).reduce(Long.MAX_VALUE, (t1, t2) -> Math.min(t1, t2));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
@@ -418,33 +451,7 @@ public void close() throws Exception { | |||
} | |||
|
|||
private long getPushbackWatermarkHold() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe remove the method ? since now the state is kept in the variable, just access directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept them for now to not have too many changes but can do that as a follow up
@Override | ||
public void pushBackAll(Iterable<T> elements) throws Exception { | ||
for (T e : elements) { | ||
// TODO: use addAll() once Flink 1.5.0 has addAll(Iterable<T>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just once Flink has addAll(Iterable)
because Flink 1.5.0 does not have it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixing
*/ | ||
class KeyedPushedBackElementsHandler<K, T> implements PushedBackElementsHandler<T> { | ||
|
||
static <K, T> PushedBackElementsHandler<T> usingState( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create
and return the concrete type KeyedPushedBackElementsHandler
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixing
if (lastSeenTimestamp == null) { | ||
// we have never seen this, emit | ||
WindowedValue<ValueWithRecordId<T>> value = streamRecord.getValue(); | ||
output.collect(streamRecord.replace(value.withValue(value.getValue().getValue()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value.withValue(value.getValue().getValue()
obvious :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeh ... 😅
|
||
Long lastSeenTimestamp = dedupingState.value(); | ||
if (lastSeenTimestamp != null | ||
&& lastSeenTimestamp.equals(internalTimer.getTimestamp() - MAX_RETENTION_SINCE_ACCESS)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Shouldn't this be like > or something like that or it has to be exact?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's exact because we really want to compare to the timer we set. But I think I just moved this code around.
@@ -148,7 +148,9 @@ public void testRestore() throws Exception { | |||
emptyMap(), | |||
emptyList(), | |||
PipelineOptionsFactory.as(FlinkPipelineOptions.class), | |||
VarLongCoder.of() | |||
VarLongCoder.of(), | |||
null /* key selector */ // TODO use proper key selector |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe create a JIRA for this one if you don't plan to address it in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apperently we don't need a key selector here so I'll remove the TODO
While doing this, we also have to re-do how we store non-keyed state in the DoFnOperator and fix usage of some interfaces that were removed.
190808e
to
c1958ce
Compare
I pushed a new version. Unfortunately, I messed up and don't have the fixups as a separate commit. 😓 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
While doing this, we also have to re-do how we store non-keyed state in
the DoFnOperator and fix usage of some interfaces that were removed.