New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-26931][Connector/pulsar] Make the producer name and consumer name unique in Pulsar #19285
[FLINK-26931][Connector/pulsar] Make the producer name and consumer name unique in Pulsar #19285
Conversation
…ame unique for each instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good but I didn't get the %s
fully. Is this supposed to be expanded by Flink or by Pulsar Client? I'm wondering because the warnings in the OP also show it.
Map<String, String> paramsMap = configuration.get(PULSAR_AUTH_PARAM_MAP); | ||
} else { | ||
Map<String, String> paramsMap = configuration.getProperties(PULSAR_AUTH_PARAM_MAP); | ||
if (paramsMap.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this also be null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, PulsarConfiguration#getProperties
will return empty map instead of null
} else { | ||
String producerName = configBuilder.get(PULSAR_PRODUCER_NAME); | ||
if (!producerName.contains("%s")) { | ||
configBuilder.override(PULSAR_PRODUCER_NAME, producerName + " - %s"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the %s
supposed to do? I haven't found anything in the Pulsar docs. Could you add a comment to clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will replace the %s as a UUID in PulsarSinkConfigUtils.java. This would make sure all the producer name unique. Same in PulsarSourceBuilder
.
configuration.useOption(
PULSAR_PRODUCER_NAME,
producerName -> String.format(producerName, UUID.randomUUID()),
builder::producerName);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see. Thank you, I missed that bit.
} else { | ||
String consumerName = configBuilder.get(PULSAR_CONSUMER_NAME); | ||
if (!consumerName.contains("%s")) { | ||
configBuilder.override(PULSAR_CONSUMER_NAME, consumerName + " - %s"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will replace the %s
with a UUID.
configuration.useOption(
PULSAR_CONSUMER_NAME,
consumerName -> String.format(consumerName, UUID.randomUUID()),
builder::consumerName);
One last high-level question (it's been a while since I reviewed Pulsar connector): Unique IDs are not stable after failure as far as I can see, could we lose data after recovery somehow? Or are the IDs only used to distribute the load? |
This unique id is just like a naming for different produce/consumer instances on Pulsar. The correctness of data when producing and consuming depends on the Pulsar transaction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Do you want to backport to 1.15?
Yep, backport to 1.15 is required. |
What is the purpose of the change
Fix the bug of using same producer name and consumer name in pulsar connector. Pulsar would treat them as same instance which causes failure.
Brief change log
Verifying this change
This change is already covered by existing tests, such as
PulsarSinkITCase
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation