[BEAM-2134] expose AUTO_COMMIT to KafkaIO.read()#2837
[BEAM-2134] expose AUTO_COMMIT to KafkaIO.read()#2837mingmxu wants to merge 3 commits intoapache:masterfrom
Conversation
|
R: @rangadi |
|
I agree with updating JavaDoc. I am not so sure about adding another method. e.g. auto_commit alone is not enough, user also needs to set a |
|
I see, without |
| * also set the initial offset of next run as last committed when checkpoint is not | ||
| * provided by runners. | ||
| */ | ||
| public Read<K, V> withOffsetAutoCommit(boolean autoCommit) { |
There was a problem hiding this comment.
return withUpdatedConfing(...); would be simpler.
| * consuming from the <em>latest</em> offsets. You can override this behavior to consume from the | ||
| * beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through | ||
| * {@link Read#updateConsumerProperties(Map)}. | ||
| * In this case, you can also enable offset auto_commit in Kafka to resume from last commited, |
| * <p>In summary, KafkaIO.read follows below sequence to set initial offset:<br> | ||
| * 1. {@link KafkaCheckpointMark} provided by runner;<br> | ||
| * 2. Consumer offset stored in Kafka when {@code withOffsetAutoCommit(true)};<br> | ||
| * 3. Start from <em>latest</em> offset by default; |
There was a problem hiding this comment.
Note that the last two are not explicitly enforced by KafkaIO. It is the KafkaConnsumer that behaves this way. I think we should make it explicit. Could be something like:
Starts from KafkaCheckpointMark if it provided runner (most users don't know when runner would provide this, some link about this would be useful). Otherwise, the initial offset is dictated by the KafkaConsumer configuration (link to KafkaConsumer description about offiset initialization). The users have complete control on this configuration, including enabling `auto_commit` of offsets. "auto.offset.reset" is set to "latest" by default, and can be overridden by the user (see withUpdatedConfigProperties()).
| * committed in the background. This information can be used by external monitoring system, | ||
| * also set the initial offset of next run as last committed when checkpoint is not | ||
| * provided by runners. | ||
| */ |
There was a problem hiding this comment.
This config also requires users to set group.id. Should we enforce it here? We should certainly mention it in the documentation.
| * {@link UnboundedKafkaSource#split(int, PipelineOptions)} for more details on | ||
| * | ||
| * <p>Checkpointing is fully supported and each split can resume from previous checkpoint, | ||
| * if it's supported and enabled in runner configuration. |
There was a problem hiding this comment.
This could be rephrased [Optional]:
"Checkpointing is fully supported and each split can resume from previous checkpoint (to the extent supported a runner)"...
There was a problem hiding this comment.
to the extent supported by runner?
rangadi
left a comment
There was a problem hiding this comment.
Overall LGTM. Two minor suggestions above.
| * consuming from the <em>latest</em> offsets. You can override this behavior to consume from the | ||
| * beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through | ||
| * {@link Read#updateConsumerProperties(Map)}. | ||
| * In this case, you can also enable offset auto_commit in Kafka to resume from last committed. |
There was a problem hiding this comment.
Can remove 'In this case,'.
|
Thanks @xumingmin!. LGTM. |
|
Thank you @rangadi ! |
|
retest this please |
…e visible.
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull requestmvn clean verify.<Jira issue #>in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.