diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java index b51da6ca2..8bd65d0d3 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java @@ -393,22 +393,30 @@ public void run() { completeWith = error; } - final Duration waitTime = ManagementRetry.this.mf.getRetryPolicy().getNextRetryInterval( - ManagementRetry.this.mf.getClientId(), lastException, this.timeoutTracker.remaining()); - if (waitTime == null) { - // Do not retry again, give up and report error. - if (completeWith == null) { - ManagementRetry.this.finalFuture.complete(null); + if (ManagementRetry.this.mf.getIsClosingOrClosed()) { + ManagementRetry.this.finalFuture.completeExceptionally( + new OperationCancelledException( + "OperationCancelled as the underlying client instance was closed.", + lastException)); + } + else { + final Duration waitTime = ManagementRetry.this.mf.getRetryPolicy().getNextRetryInterval( + ManagementRetry.this.mf.getClientId(), lastException, this.timeoutTracker.remaining()); + if (waitTime == null) { + // Do not retry again, give up and report error. + if (completeWith == null) { + ManagementRetry.this.finalFuture.complete(null); + } else { + ManagementRetry.this.finalFuture.completeExceptionally(completeWith); + } } else { - ManagementRetry.this.finalFuture.completeExceptionally(completeWith); + // The only thing needed here is to schedule a new attempt. Even if the RequestResponseChannel has croaked, + // ManagementChannel uses FaultTolerantObject, so the underlying RequestResponseChannel will be recreated + // the next time it is needed. + final ManagementRetry retrier = new ManagementRetry(ManagementRetry.this.finalFuture, ManagementRetry.this.timeoutTracker, + ManagementRetry.this.mf, ManagementRetry.this.request); + EventHubClientImpl.this.timer.schedule(retrier, waitTime); } - } else { - // The only thing needed here is to schedule a new attempt. Even if the RequestResponseChannel has croaked, - // ManagementChannel uses FaultTolerantObject, so the underlying RequestResponseChannel will be recreated - // the next time it is needed. - final ManagementRetry retrier = new ManagementRetry(ManagementRetry.this.finalFuture, ManagementRetry.this.timeoutTracker, - ManagementRetry.this.mf, ManagementRetry.this.request); - EventHubClientImpl.this.timer.schedule(retrier, waitTime); } } }); diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java index fd96254ae..68a9719b4 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java @@ -4,7 +4,7 @@ */ package com.microsoft.azure.eventhubs.impl; -import com.microsoft.azure.eventhubs.OperationCancelledException; +import com.microsoft.azure.eventhubs.EventHubException; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.UnsignedLong; import org.apache.qpid.proton.amqp.messaging.Source; @@ -241,8 +241,10 @@ public void onError(Exception exception) { public void onClose(ErrorCondition condition) { if (condition == null || condition.getCondition() == null) { - this.cancelPendingRequests(new OperationCancelledException( - "Operation cancelled as the underlying request-response link closed")); + this.cancelPendingRequests( + new EventHubException( + ClientConstants.DEFAULT_IS_TRANSIENT, + "The underlying request-response channel closed, recreate the channel and retry the request.")); if (onClose != null) onLinkCloseComplete(null);