-
Notifications
You must be signed in to change notification settings - Fork 22
Change to use file to store offset information instead of table #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
albertshau
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comments, thanks for removing all the code duplication
| byte[] keyBytes = consumerRecord.key(); | ||
| byte[] value = consumerRecord.value(); | ||
| if (value == null) { | ||
| LOG.warn("Received message with null message.payload with topic {} and partition {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was in existing code, but this is better as:
Received message with null payload from partition {} of topic {}.
This also seems like an easy way to flood the log with thousands of warnings. Do you know if the value is ever expected to be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not logging at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently it could be null based on the Kafka source.
| * @return a {@link KafkaPartitionOffsets} object decoded from the file | ||
| * @throws IOException if failed to read the file | ||
| */ | ||
| public static KafkaPartitionOffsets load(FileContext fc, Path offsetFile) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
annotate offsetFile as nullable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it never expected to be null and caller would make sure it is not null (if it is null, the caller shouldn't be able to construct the FileContext).
| ETLStage sink = new ETLStage("sink", MockSink.getPlugin(outputName)); | ||
|
|
||
| ETLBatchConfig pipelineConfig = ETLBatchConfig.builder() | ||
| .setTimeSchedule("* * * * *") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is not needed
|
|
||
| WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); | ||
| workflowManager.start(Collections.singletonMap("maxNumberRecords", "-1")); | ||
| workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 1, TimeUnit.MINUTES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use startAndWaitForRun() to simplify a little bit and protect against changes in the future if somebody runs the program before this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, didn't know we have that.
| "A file named with the pipeline name will be created under the given directory.") | ||
| @Macro | ||
| @Nullable | ||
| private String offsetDir; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
widget files and reference docs need to be updated to include this new property.
albertshau
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
…f table - Big refactoring to consolidate highly duplicated code between Kafka 0.8 and Kafka 0.10 implementations - Introduced a common module for the common implementation
2987d5a to
59404de
Compare
|
Squashed and merging. |
Kafka 0.8 and Kafka 0.10 implementations