-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-91: Separate lookup timeout from operation timeout #11627
Conversation
This patch contains a number of changes. TooManyRequests is retried for partition metadata and lookups Lookup timeout configuration has been added. By default it matches operation timeout. Partition metadata timeout calculation has been fixed to calculate the elapsed time correctly. Small refactor on broker construction to allow a mocked ServerCnx implementation for testing. Unfortunately, the test takes over 50 seconds, but this is unavoidable due to the fact that we're working with timeouts here. PulsarClientExceptions have been reworked to contain more context (remote/local/reqid) and any previous exceptions which may have occurred triggering retries. The previous exceptions must be manually recorded, so this only applies to lookups on the consumer side for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work!
LGTM
Some tests are failing. At least one of the failures is legit, so I'll look into it and post an update |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
Show resolved
Hide resolved
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
The history behind introducing TooManyRequest error is to handle backpressure for zookeeper by throttling a large number of concurrent topics loading during broker cold restart. Therefore, pulsar has lookup throttling at both client and server-side that slows down lookup because lookup ultimately triggers topic loading at server side. So, when a client sees TooManyRequest errors, the client should retry to perform this operation and the client will eventually reconnect to the broker, TooManyRequest can not harm the broker because broker already has a safeguard to reject the flood of the requests. |
@rdhabalia There is very little complexity added to separate the timeout. The complexity I think you are referring to is recordkeeping so that if an exception is thrown, it contains information about previous failures, not just the last failure. Business logic never actually looks at the List of exceptions. |
BrokerClientIntegrationTest#testCloseConnectionOnBrokerRejected was depending on the fact that TooManyRequests was previously fatal for partition metadata request. Now that it retries, that test was failing. It's a bad test anyhow, depending on thread interactions and whatnot. I've rewritten it to use the ServerCnx mock. It now actually tests for the thing it should, that clients close the connection after the max rejects. The schema tests were failing because they expected a certain exception message which has been extended. I changes endsWith to contains. I also added Producer retries similiar to the Consumer ones. I was going to do as a followon PR, but decided to put in this one.
Thanks for your contribution. For this PR, do we need to update docs? (The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks) |
} | ||
|
||
@Override | ||
public String toString() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to print out the previous encountered exceptions every time we log an exception? We already log every exception in the client, can't we just search the logs for the history?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerrypeng we don't print it when we log. the previous exceptions only get attached when the exception is propagated to the client. It's useful because it gives you more info to correlate on the broker side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ivankelly so when will be print out the previous exceptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ivankelly aren't we printing lookup errors here:
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L81
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking through the code, we do log exceptions returned from the broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant is, we don't print the exception with the previous exceptions attached every time we log. We only print that when at the point that we're about to complete the subscribeFuture or producerCreatedFuture with an exception. Which is when the exception gets passed to the client. For me, the logging of that exception is incidental. What I want is for the client code to get an exception that has context about the retries.
Take for example the case of a customer who has a flink pipeline, and they get a TooManyRequestsException. They take a screenshot of the exception in the flink dashboard and send it to us. I want all the information to be in that screenshot, and not have to ask them to dig around in flink logs to get it.
if (nonRetriableError) { | ||
log.info("[{}] Consumer creation failed for consumer {} with unretriableError {}", topic, consumerId, exception); | ||
} else { | ||
log.info("[{}] Consumer creation failed for consumer {} after timeout", topic, consumerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will still be useful to log the exception even if we timed out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't change this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it wouldn't add any more information. TimeoutExceptions all come from
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
Line 1149 in 05827ae
if (requestFuture.completeExceptionally(new TimeoutException(timeoutMessage))) { |
So the timeout exception message and stack would be exactly the same every time.
Also, note that with the non-timeout exception it's not printing the stacktrace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks good to me. Left a couple of comments
@Anonymitaet there's javadoc for the new configuration option. |
@@ -243,7 +245,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat | |||
this.initialStartMessageId = this.startMessageId; | |||
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec; | |||
AVAILABLE_PERMITS_UPDATER.set(this, 0); | |||
this.subscribeTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs(); | |||
this.subscribeTimeout = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The time to subscribe will include time to do a lookup + the time to create a connection (if the connection to the broker is not established yet). However, with out current code we are including the connection establishment time within our lookup time. This makes this timeout here confusing and hard to reason about as it may or may not include the time to establish a connection. Also establishing a connection has its own timeout which defaults to 10 seconds. I think we should clearly separate the two timeouts so one is not just overlapping with the other and we can clearly understand if subscribe failed because of a lookup timeout or a connection timeout.
Same for producers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also includes the time to do CommandSubscribe and CommandPublish.
Separating the timeout to do lookup from the time to establish the correct connection is a major rework of how timeouts work. The lookup timeout and retry is handled in ConsumerImpl and ProducerImpl, and these only get signals via connectionFailed and connectionOpen callbacks. So separate it out, we'd need to refactor how the Impls get a connection. Currently it goes from Impl->ConnectionHandler->PulsarClientImpl->LookupService. I don't think it's worth it. It's already clear if subscribe failed due to lookup timeout or a connection timeout. The exception returned is different, PulsarClientException.TimeoutException for the former, netty ConnectTimeoutException for the latter. If you want to know which node you failed to connect to, it's there in the exception message.
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.CompletionException: io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.1.34:5432
w.r.t. including the CommandSubscribe and CommandProducer, I can change this, but it would create a behavioral change by default as then the operationTimeout for these commands only starts counting down after lookup has succeeded. i.e. the whole operation could take twice as long. I guess this isn't a major issue though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some additional comments. Thanks.
@BewareMyPower do you have any additional concerns? |
IIUC @rdhabalia left some comments on the ML @rdhabalia do you mind to official write your position about this PR ? it also looks like that @jerrypeng initially approved the PR and then added more comments, if you can @jerrypeng please add your review as well |
@eolivelli this has 4 approvals and no changes requested. IMO it's ready to merge. |
agreed. merging now |
@eolivelli thanks |
* PIP-91: Separate lookup timeout from operation timeout This patch contains a number of changes. TooManyRequests is retried for partition metadata and lookups Lookup timeout configuration has been added. By default it matches operation timeout. Partition metadata timeout calculation has been fixed to calculate the elapsed time correctly. Small refactor on broker construction to allow a mocked ServerCnx implementation for testing. Unfortunately, the test takes over 50 seconds, but this is unavoidable due to the fact that we're working with timeouts here. PulsarClientExceptions have been reworked to contain more context (remote/local/reqid) and any previous exceptions which may have occurred triggering retries. The previous exceptions must be manually recorded, so this only applies to lookups on the consumer side for now. * Fixup for test failures BrokerClientIntegrationTest#testCloseConnectionOnBrokerRejected was depending on the fact that TooManyRequests was previously fatal for partition metadata request. Now that it retries, that test was failing. It's a bad test anyhow, depending on thread interactions and whatnot. I've rewritten it to use the ServerCnx mock. It now actually tests for the thing it should, that clients close the connection after the max rejects. The schema tests were failing because they expected a certain exception message which has been extended. I changes endsWith to contains. I also added Producer retries similiar to the Consumer ones. I was going to do as a followon PR, but decided to put in this one. Co-authored-by: Ivan Kelly <ikelly@splunk.com>
This patch contains a number of changes.
TooManyRequests is retried for partition metadata and lookups
Lookup timeout configuration has been added. By default it matches
operation timeout.
Partition metadata timeout calculation has been fixed to calculate
the elapsed time correctly.
Small refactor on broker construction to allow a mocked ServerCnx
implementation for testing. Unfortunately, the test takes over 50
seconds, but this is unavoidable due to the fact that we're working
with timeouts here.
PulsarClientExceptions have been reworked to contain more
context (remote/local/reqid) and any previous exceptions which may
have occurred triggering retries. The previous exceptions must be
manually recorded, so this only applies to lookups on the consumer
side for now.