Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Kafka's ConsumerConfig.MAX_POLL_RECORDS_CONFIG to config max number of message will return in a single poll #3887

Merged
merged 1 commit into from
Mar 26, 2019

Conversation

MarvinCai
Copy link
Contributor

Motivation
Support Kafka's ConsumerConfig.MAX_POLL_RECORDS_CONFIG to config max number of message will return in a single poll. #1090
Also update doc to reflect that we already supporting earlist and latest strategy for ConsumerConfig.AUTO_OFFSET_RESET_CONFIG.

Kafka config parser has check to make sure 'max.poll.records' value is greater than 1 and will throw exception if config < 1, so I didn't add extra check.

…number of message will return in a single poll.

Also update doc to reflect that we already supporting earlist and latest strategy for ConsumerConfig.AUTO_OFFSET_RESET_CONFIG.
@@ -304,8 +313,6 @@ public void unsubscribe() {
});
}

private static final int MAX_RECORDS_IN_SINGLE_POLL = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should still use the constant for 1000, maybe renaming to DEFAULT_MAX_RECORDS_IN_SINGLE_POLL

| `max.poll.interval.ms` | Ignored | Messages are "pushed" from broker |
| `session.timeout.ms` | Ignored | |
| `heartbeat.interval.ms` | Ignored | |
| `bootstrap.servers` | Yes | Needs to point to a single Pulsar service URL |
| `enable.auto.commit` | Yes | |
| `auto.commit.interval.ms` | Ignored | With auto-commit, acks are sent immediately to broker |
| `partition.assignment.strategy` | Ignored | |
| `auto.offset.reset` | Ignored | |
| `auto.offset.reset` | Yes | Only support earliest and latest. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be the same behavior as Kafka, no?

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

earliest: automatically reset the offset to the earliest offset
latest: automatically reset the offset to the latest offset
none: throw exception to the consumer if no previous offset is found for the consumer's group
anything else: throw exception to the consumer.

@MarvinCai
Copy link
Contributor Author

Hi @merlimat, so for the auto.offset.reset setting, Pulsar only use it when offset info somehow not found in the offset map, so it’s not used exactly the same as it used in Kafka right?

@merlimat
Copy link
Contributor

@MarvinCai Isn't the same behavior from Kafka ?

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

@MarvinCai
Copy link
Contributor Author

@merlimat Yes, for earliest and latest we have the same behavior, but we're not handling none or anything else case, I think we should add it.
And in the PulsarKafkaConsumer.java use ConsumerConfig.GROUP_ID_CONFIG, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG. But in the doc we only say we support them, which sound like optional config. So I think we should either mark them as required in the doc or provided some default value to them and specify in the doc. Which one you think would be better.

@merlimat
Copy link
Contributor

but we're not handling none or anything else case, I think we should add it.

Oh I see. Very good point. The question is that I don't think we have a direct way right now to do that (other that using REST API to check the topic stats... which I don't think would be a good option).

@merlimat
Copy link
Contributor

run java8 tests
run integration tests

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@MarvinCai
Copy link
Contributor Author

run integration tests

@merlimat merlimat merged commit b42c1e5 into apache:master Mar 26, 2019
@merlimat merlimat added this to the 2.4.0 milestone Mar 26, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants