New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-5163] Ports the production functions to the new state abstraction. #2871
Conversation
1dde7c7
to
dde5d3a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, very good set of commits! 👍
I had some style comments (about the state descriptor), feel free to ignore these. We should have a minimal checkpoint/restore test for the StatefulSequenceSource
, though. Could call it StatefulSequenceSourceTest
. These tests in SourceFunctionTest
seem a bit weird/minimal.
|
||
private static final long serialVersionUID = 1L; | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class); | ||
|
||
private static final String FILE_MONITORING_STATE_NAME = "file-monitoring-state"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this you could also have
private static final ListStateDescriptor<Long> STATE_DESCRIPTOR =
new ListStateDescriptor<>("file-monitoring-state", LongSerializer.INSTANCE);
and then use this descriptor later directly instead of initialising with this field.
That's just a personal style nitpick. Your version is also fine. 😃
@@ -62,6 +73,9 @@ | |||
/** Flag to make the source cancelable */ | |||
private volatile boolean isRunning = true; | |||
|
|||
private transient ListState<Integer> checkpointedState; | |||
|
|||
private static final String FROM_ELEMENT_STATE_NAME = "from-element-state"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this you could also have
private static final ListStateDescriptor<Integer> STATE_DESCRIPTOR =
new ListStateDescriptor<>("from-elements-state", IntSerializer.INSTANCE);
and then use this descriptor later directly instead of initialising with this field.
That's just a personal style nitpick. Your version is also fine. 😃
private volatile boolean isRunning = true; | ||
|
||
private transient Deque<Long> valuesToEmit; | ||
|
||
private static final String STATEFUL_SOURCE_STATE = "stateful-source-state"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this you could also have
private static final ListStateDescriptor<Long> STATE_DESCRIPTOR =
new ListStateDescriptor<>("stateful-source-state", LongSerializer.INSTANCE);
and then use this descriptor later directly instead of initialising with this field.
That's just a personal style nitpick. Your version is also fine. 😃
@@ -52,12 +52,4 @@ public void fromCollectionTest() throws Exception { | |||
Arrays.asList(1, 2, 3)))); | |||
assertEquals(expectedList, actualList); | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to have a snapshot/restore test for this source, verifying that we see all the expected elements (no matter the order).
|
||
private static final long serialVersionUID = -8689291992192955579L; | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(MessageAcknowledgingSourceBase.class); | ||
|
||
private static final String MESSAGE_ACKNOWLEDGING_SOURCE_STATE = "message-acknowledging-source-state"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments on the other functions, I'm not writing it again ... 😉
|
||
this.interval = interval; | ||
this.watchType = watchType; | ||
this.readerParallelism = Math.max(readerParallelism, 1); | ||
this.globalModificationTime = Long.MIN_VALUE; | ||
} | ||
|
||
public long getGlobalModificationTime() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably have @VisibleForTesting
.
dde5d3a
to
5401b6c
Compare
5401b6c
to
2ac6f6f
Compare
@kl0u Looks even better now! 👍 I'm running a last time on travis and them I'm merging. |
@aljoscha Thanks a lot for the review! |
I merged this, could you please close the PR and the issue? Thanks for your work. 👍 |
@aljoscha Yes thanks a lot! |
This includes the following functions:
StatefulSequenceSource
MessageAcknowledgingSourceBase
FromElementsFunction
ContinuousFileMonitoringFunction
Each of them is a separate commit, for ease of reviewing.
Most of the functions assume parallelism of 1. The only exception is the
StatefulSequenceSource
.R @aljoscha