Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RequestResponseChannel should not throw OperationCancelled on PendingRequests while closing #372

Merged
merged 1 commit into from
Aug 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -393,22 +393,30 @@ public void run() {
completeWith = error;
}

final Duration waitTime = ManagementRetry.this.mf.getRetryPolicy().getNextRetryInterval(
ManagementRetry.this.mf.getClientId(), lastException, this.timeoutTracker.remaining());
if (waitTime == null) {
// Do not retry again, give up and report error.
if (completeWith == null) {
ManagementRetry.this.finalFuture.complete(null);
if (ManagementRetry.this.mf.getIsClosingOrClosed()) {
ManagementRetry.this.finalFuture.completeExceptionally(
new OperationCancelledException(
"OperationCancelled as the underlying client instance was closed.",
lastException));
}
else {
final Duration waitTime = ManagementRetry.this.mf.getRetryPolicy().getNextRetryInterval(
ManagementRetry.this.mf.getClientId(), lastException, this.timeoutTracker.remaining());
if (waitTime == null) {
// Do not retry again, give up and report error.
if (completeWith == null) {
ManagementRetry.this.finalFuture.complete(null);
} else {
ManagementRetry.this.finalFuture.completeExceptionally(completeWith);
}
} else {
ManagementRetry.this.finalFuture.completeExceptionally(completeWith);
// The only thing needed here is to schedule a new attempt. Even if the RequestResponseChannel has croaked,
// ManagementChannel uses FaultTolerantObject, so the underlying RequestResponseChannel will be recreated
// the next time it is needed.
final ManagementRetry retrier = new ManagementRetry(ManagementRetry.this.finalFuture, ManagementRetry.this.timeoutTracker,
ManagementRetry.this.mf, ManagementRetry.this.request);
EventHubClientImpl.this.timer.schedule(retrier, waitTime);
}
} else {
// The only thing needed here is to schedule a new attempt. Even if the RequestResponseChannel has croaked,
// ManagementChannel uses FaultTolerantObject, so the underlying RequestResponseChannel will be recreated
// the next time it is needed.
final ManagementRetry retrier = new ManagementRetry(ManagementRetry.this.finalFuture, ManagementRetry.this.timeoutTracker,
ManagementRetry.this.mf, ManagementRetry.this.request);
EventHubClientImpl.this.timer.schedule(retrier, waitTime);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/
package com.microsoft.azure.eventhubs.impl;

import com.microsoft.azure.eventhubs.OperationCancelledException;
import com.microsoft.azure.eventhubs.EventHubException;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Source;
Expand Down Expand Up @@ -241,8 +241,10 @@ public void onError(Exception exception) {
public void onClose(ErrorCondition condition) {

if (condition == null || condition.getCondition() == null) {
this.cancelPendingRequests(new OperationCancelledException(
"Operation cancelled as the underlying request-response link closed"));
this.cancelPendingRequests(
new EventHubException(
ClientConstants.DEFAULT_IS_TRANSIENT,
"The underlying request-response channel closed, recreate the channel and retry the request."));

if (onClose != null)
onLinkCloseComplete(null);
Expand Down