Skip to content

Commit

Permalink
message receiver - fix null pointer error and ensure that receive lin…
Browse files Browse the repository at this point in the history
…k is recreated upon a failure (#439)

* message receiver/sender - fix null pointer error and ensure that receive/send link is recreated on a failure.
  • Loading branch information
sjkwak committed Apr 4, 2019
1 parent 09f6565 commit 878e6ee
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void run() {
"clientId[%s], path[%s], linkName[%s] - Reschedule operation timer, current: [%s], remaining: [%s] secs",
getClientId(),
receivePath,
receiveLink.getName(),
getReceiveLinkName(),
Instant.now(),
timeoutTracker.remaining().getSeconds()));
}
Expand Down Expand Up @@ -154,7 +154,7 @@ public void onComplete(Void result) {
TRACE_LOGGER.debug(
String.format(Locale.US,
"clientId[%s], path[%s], linkName[%s] - token renewed",
getClientId(), receivePath, receiveLink.getName()));
getClientId(), receivePath, getReceiveLinkName()));
}
}

Expand All @@ -164,7 +164,7 @@ public void onError(Exception error) {
TRACE_LOGGER.info(
String.format(Locale.US,
"clientId[%s], path[%s], linkName[%s], tokenRenewalFailure[%s]",
getClientId(), receivePath, receiveLink.getName(), error.getMessage()));
getClientId(), receivePath, getReceiveLinkName(), error.getMessage()));
}
}
});
Expand All @@ -173,7 +173,7 @@ public void onError(Exception error) {
TRACE_LOGGER.info(
String.format(Locale.US,
"clientId[%s], path[%s], linkName[%s], tokenRenewalScheduleFailure[%s]",
getClientId(), receivePath, receiveLink.getName(), exception.getMessage()));
getClientId(), receivePath, getReceiveLinkName(), exception.getMessage()));
}
}
}
Expand Down Expand Up @@ -236,6 +236,10 @@ private List<Message> receiveCore(final int messageCount) {
return returnMessages;
}

private String getReceiveLinkName() {
return this.receiveLink == null ? "null" : this.receiveLink.getName();
}

public Duration getReceiveTimeout() {
return this.receiveTimeout;
}
Expand Down Expand Up @@ -263,7 +267,7 @@ public CompletableFuture<Collection<Message>> receive(final int maxMessageCount)
"clientId[%s], path[%s], linkName[%s] - schedule operation timer, current: [%s], remaining: [%s] secs",
this.getClientId(),
this.receivePath,
this.receiveLink.getName(),
this.getReceiveLinkName(),
Instant.now(),
this.receiveTimeout.getSeconds()));
}
Expand Down Expand Up @@ -308,7 +312,7 @@ public void onOpenComplete(Exception exception) {

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("onOpenComplete - clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s]",
this.getClientId(), this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), this.prefetchCount));
this.getClientId(), this.receivePath, this.getReceiveLinkName(), this.receiveLink.getCredit(), this.prefetchCount));
}
} else {
synchronized (this.errorConditionLock) {
Expand Down Expand Up @@ -407,7 +411,7 @@ public void onError(final Exception exception) {
String.format(Locale.US, "clientId[%s], receiverPath[%s], linkName[%s], onError: %s",
this.getClientId(),
this.receivePath,
this.receiveLink.getName(),
this.getReceiveLinkName(),
completionException));
}

Expand All @@ -425,7 +429,7 @@ public void onError(final Exception exception) {
@Override
public void onEvent() {
if (!MessageReceiver.this.getIsClosingOrClosed()
&& (receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) {
&& (receiveLink == null || receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) {
createReceiveLink();
underlyingFactory.getRetryPolicy().incrementRetryCount(getClientId());
}
Expand All @@ -438,7 +442,7 @@ public void onEvent() {
String.format(Locale.US, "clientId[%s], receiverPath[%s], linkName[%s], scheduling createLink encountered error: %s",
this.getClientId(),
this.receivePath,
this.receiveLink.getName(), ignore.getLocalizedMessage()));
this.getReceiveLinkName(), ignore.getLocalizedMessage()));
}
}
}
Expand Down Expand Up @@ -620,7 +624,7 @@ private void sendFlow(final int credits) {

if (TRACE_LOGGER.isDebugEnabled()) {
TRACE_LOGGER.debug(String.format("clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s], ThreadId[%s]",
this.getClientId(), this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), tempFlow, Thread.currentThread().getId()));
this.getClientId(), this.receivePath, this.getReceiveLinkName(), this.receiveLink.getCredit(), tempFlow, Thread.currentThread().getId()));
}
}
}
Expand Down Expand Up @@ -825,7 +829,7 @@ public void onEvent() {
receiveWork.onEvent();

if (!MessageReceiver.this.getIsClosingOrClosed()
&& (receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) {
&& (receiveLink == null || receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) {
createReceiveLink();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void onComplete(Void result) {
if (TRACE_LOGGER.isDebugEnabled()) {
TRACE_LOGGER.debug(String.format(Locale.US,
"clientId[%s], path[%s], linkName[%s] - token renewed",
getClientId(), sendPath, sendLink.getName()));
getClientId(), sendPath, getSendLinkName()));
}
}

Expand All @@ -116,15 +116,15 @@ public void onError(Exception error) {
if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format(Locale.US,
"clientId[%s], path[%s], linkName[%s] - tokenRenewalFailure[%s]",
getClientId(), sendPath, sendLink.getName(), error.getMessage()));
getClientId(), sendPath, getSendLinkName(), error.getMessage()));
}
}
});
} catch (IOException | NoSuchAlgorithmException | InvalidKeyException | RuntimeException exception) {
if (TRACE_LOGGER.isWarnEnabled()) {
TRACE_LOGGER.warn(String.format(Locale.US,
"clientId[%s], path[%s], linkName[%s] - tokenRenewalScheduleFailure[%s]",
getClientId(), sendPath, sendLink.getName(), exception.getMessage()));
getClientId(), sendPath, getSendLinkName(), exception.getMessage()));
}
}
}
Expand Down Expand Up @@ -241,6 +241,10 @@ private CompletableFuture<Void> send(
return this.sendCore(bytes, arrayOffset, messageFormat, onSend, tracker, null, null);
}

private String getSendLinkName() {
return this.sendLink == null ? "null" : this.sendLink.getName();
}

public CompletableFuture<Void> send(final Iterable<Message> messages) {
if (messages == null || IteratorUtil.sizeEquals(messages, 0)) {
throw new IllegalArgumentException(String.format(Locale.US,
Expand Down Expand Up @@ -335,7 +339,7 @@ public void onOpenComplete(Exception completionException) {

if (TRACE_LOGGER.isInfoEnabled()) {
TRACE_LOGGER.info(String.format("onOpenComplete - clientId[%s], sendPath[%s], linkName[%s]",
this.getClientId(), this.sendPath, this.sendLink.getName()));
this.getClientId(), this.sendPath, this.getSendLinkName()));
}

if (!this.linkFirstOpen.isDone()) {
Expand Down Expand Up @@ -471,7 +475,7 @@ public void onError(final Exception completionException) {
@Override
public void onEvent() {
if (!MessageSender.this.getIsClosingOrClosed()
&& (sendLink.getLocalState() == EndpointState.CLOSED || sendLink.getRemoteState() == EndpointState.CLOSED)) {
&& (sendLink == null || sendLink.getLocalState() == EndpointState.CLOSED || sendLink.getRemoteState() == EndpointState.CLOSED)) {
recreateSendLink();
}
}
Expand Down Expand Up @@ -506,7 +510,7 @@ public void onSendComplete(final Delivery delivery) {
String.format(
Locale.US,
"clientId[%s], path[%s], linkName[%s], deliveryTag[%s]",
this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag));
this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag));

final ReplayableWorkItem<Void> pendingSendWorkItem = this.pendingSendsData.remove(deliveryTag);

Expand Down Expand Up @@ -574,7 +578,7 @@ public void onEvent() {
if (TRACE_LOGGER.isDebugEnabled())
TRACE_LOGGER.debug(
String.format(Locale.US, "clientId[%s]. path[%s], linkName[%s], delivery[%s] - mismatch (or send timed out)",
this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag));
this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag));
}
}

Expand Down Expand Up @@ -773,7 +777,7 @@ public void onFlow(final int creditIssued) {
int numberOfSendsWaitingforCredit = this.pendingSends.size();
TRACE_LOGGER.debug(String.format(Locale.US,
"clientId[%s], path[%s], linkName[%s], remoteLinkCredit[%s], pendingSendsWaitingForCredit[%s], pendingSendsWaitingDelivery[%s]",
this.getClientId(), this.sendPath, this.sendLink.getName(), creditIssued, numberOfSendsWaitingforCredit, this.pendingSendsData.size() - numberOfSendsWaitingforCredit));
this.getClientId(), this.sendPath, this.getSendLinkName(), creditIssued, numberOfSendsWaitingforCredit, this.pendingSendsData.size() - numberOfSendsWaitingforCredit));
}

this.sendWork.onEvent();
Expand All @@ -786,7 +790,7 @@ private void recreateSendLink() {

// actual send on the SenderLink should happen only in this method & should run on Reactor Thread
private void processSendWork() {
if (this.sendLink.getLocalState() == EndpointState.CLOSED || this.sendLink.getRemoteState() == EndpointState.CLOSED) {
if (this.sendLink == null || this.sendLink.getLocalState() == EndpointState.CLOSED || this.sendLink.getRemoteState() == EndpointState.CLOSED) {
if (!this.getIsClosingOrClosed())
this.recreateSendLink();

Expand Down Expand Up @@ -840,7 +844,7 @@ private void processSendWork() {
if (TRACE_LOGGER.isDebugEnabled()) {
TRACE_LOGGER.debug(
String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s], sentMessageSize[%s], payloadActualSize[%s] - sendlink advance failed",
this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize()));
this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize()));
}

if (delivery != null) {
Expand All @@ -858,7 +862,7 @@ private void processSendWork() {
if (TRACE_LOGGER.isDebugEnabled()) {
TRACE_LOGGER.debug(
String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s] - sendData not found for this delivery.",
this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag));
this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag));
}
}

Expand Down

0 comments on commit 878e6ee

Please sign in to comment.