New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add epoch for connection handler to handle create producer timeout. #5571
Conversation
@wolfstudy if this PR can complete before cut 2.4.2, please considering include it, thanks. |
@wolfstudy @codelipenghui This issue has been reported by many users. so let's include it in 2.4.2 |
run java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui the idea looks good to me. left a few comments. PTAL
/** | ||
* @return the name of producer is generated or user specified | ||
*/ | ||
boolean isGeneratedName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to expose this to producer api? I think this is an implementation detail, which should be hidden in the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, currently no need to expose to the interface
import org.apache.pulsar.client.api.PulsarClientException; | ||
import org.apache.pulsar.client.impl.HandlerState.State; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
class ConnectionHandler { | ||
public class ConnectionHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add @VisiableForTesting
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let me try
@@ -57,6 +57,7 @@ | |||
|
|||
private String topicName = null; | |||
private String producerName = null; | |||
private boolean isGeneratedName = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need adding another field in ProducerConfigurationData
?
|
||
// Indicate the name of the producer is generated or not(user specified) | ||
// Use default false here is in order to be forward compatible with the client | ||
optional bool is_generated_name = 9 [default = false]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is_generated_name
=> user_provided_producer_name
?
throw new NamingException( | ||
"Producer with name '" + producer.getProducerName() + "' is already connected to topic"); | ||
boolean canOverwrite = false; | ||
for (Producer existProducer : producers.values()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid iterating over the producers set? I.e. can you change the hash set to a hash map?
@sijie I have addressed your comments, please take a look again. |
@@ -139,7 +140,9 @@ public String toString() { | |||
|
|||
@Override | |||
public ConcurrentOpenHashSet<Producer> getProducers() { | |||
return producers; | |||
ConcurrentOpenHashSet<Producer> result = new ConcurrentOpenHashSet<>(16, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops. I didn't realize that there is a getProducers
here. It is used in a lot of places. hence it might be a performance problem if we changed to a hash map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie i will try change the getProducers method to return a map.
run java8 tests |
1 similar comment
run java8 tests |
canOverwrite = true; | ||
} | ||
if (canOverwrite) { | ||
producers.put(producer.getProducerName(), producer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use producers.replace(producer.getProducerName(), existingProducer, producer)
to make sure one can successfully add the producer.
canOverwrite = true; | ||
} | ||
if (canOverwrite) { | ||
producers.put(producer.getProducerName(), producer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use replace and check the return result
oldProducer.close(); | ||
canOverwrite = true; | ||
} | ||
if (canOverwrite) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this can be simplified with
if (canOverwrite && !producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be if can simplified with
if (!canOverwrite || !producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected")
}
Right?
@merlimat @rdhabalia PTAL |
run java8 tests |
run cpp tests |
run java8 tests |
2 similar comments
run java8 tests |
run java8 tests |
run java8 tests |
@merlimat @rdhabalia can you review this pull request? |
@codelipenghui can you also create github issues for tracking adding similar fixes for other language clients? |
@merlimat @rdhabalia can you please review this? |
@codelipenghui l will change the |
@codelipenghui @jiazhai The pr changes the proto file, will move the milestone to |
Fixes #5535
Motivation
Currently, if user create producer timeout, the connection handler of producer will reconnect to the broker later, but if in broker already done the previous create producer request, the reconnection will failed with "producer with name xxx is already connected".
So this PR will introduce epoch for connection handler and add a field named
isGeneratedName
for producer to handle above problem.This PR only handle the generated producer name scenario, so many users occur errors such like
#5535, so we need to fix the generated producer name scenario first.
For the scenario of user specified producer name, we can discuss later and find a simple approach to handle it, i left my idea here: using producer id and producer name as the identity of producer, producer name used for EO producer and producer id can used by the producer reconnect, but this approach depends on globally unique producer id generator.
Modifications
If the producer with generated producer name and epoch of the producer is bigger than the exists producer, the new producer will overwrite the old producer, so the
reconnect producer
will create succeed.Verifying this change
Add unit tests to simulate producer timeout and reconnection
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation