Skip to content

Commit

Permalink
Fix con-notify race-condition.
Browse files Browse the repository at this point in the history
Delegate onAcknowledgement to serial execution of exchange.
Check, if current control notification is still the response the
notification controller was created for.

Signed-off-by: Achim Kraus <achim.kraus@bosch.io>
  • Loading branch information
Achim Kraus committed Mar 17, 2021
1 parent b8f9a01 commit d974921
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void receiveEmptyMessage(final Exchange exchange, final EmptyMessage mess
}

private void prepareSelfReplacement(Exchange exchange, Response response) {
response.addMessageObserver(new NotificationController(exchange));
response.addMessageObserver(new NotificationController(exchange, response));
}

/**
Expand All @@ -176,52 +176,68 @@ private void prepareSelfReplacement(Exchange exchange, Response response) {
private class NotificationController extends MessageObserverAdapter {

private Exchange exchange;
private Response response;

public NotificationController(Exchange exchange) {
public NotificationController(Exchange exchange, Response response) {
this.exchange = exchange;
this.response = response;
}

@Override
public void onAcknowledgement() {
ObserveRelation relation = exchange.getRelation();
final Response next = relation.getNextControlNotification();
relation.setCurrentControlNotification(next);
// next may be null
relation.setNextControlNotification(null);
if (next != null) {
LOGGER.debug("notification has been acknowledged, send the next one");
// Create a new task for sending next response so that we
// can leave the sync-block
exchange.execute(new Runnable() {

@Override
public void run() {
ObserveLayer.super.sendResponse(exchange, next);
exchange.execute(new Runnable() {

@Override
public void run() {
ObserveRelation relation = exchange.getRelation();
if (relation.getCurrentControlNotification() == response) {
final Response next = relation.getNextControlNotification();
// next may be null
relation.setCurrentControlNotification(next);
relation.setNextControlNotification(null);
if (next != null) {
if (relation.isCanceled()) {
next.cancel();
} else {
LOGGER.trace("notification has been acknowledged, send the next one");
ObserveLayer.super.sendResponse(exchange, next);
}
}
}
});
}
}
});
}

@Override
public void onRetransmission() {
// called within the exchange executor context.
ObserveRelation relation = exchange.getRelation();
final Response next = relation.getNextControlNotification();
if (next != null) {
LOGGER.debug("notification has timed out and there is a fresher notification for the retransmission");
// Cancel the original retransmission and
// send the fresh notification here
exchange.getCurrentResponse().cancel();
// Convert all notification retransmissions to CON
if (next.getType() != Type.CON) {
next.setType(Type.CON);
prepareSelfReplacement(exchange, next);
if (relation.getCurrentControlNotification() == response) {
Response next = relation.getNextControlNotification();
if (relation.isCanceled()) {
response.cancel();
if (next != null) {
next.cancel();
next = null;
}
}
if (next != null) {
LOGGER.trace(
"notification has timed out and there is a fresher notification for the retransmission");
// Cancel the original retransmission and
// send the fresh notification here
response.cancel();
// Convert all notification retransmissions to CON
if (next.getType() != Type.CON) {
next.setType(Type.CON);
prepareSelfReplacement(exchange, next);
}
relation.setCurrentControlNotification(next);
relation.setNextControlNotification(null);
// Create a new task for sending next response so that we
// can leave the sync-block
ObserveLayer.super.sendResponse(exchange, next);
}
relation.setCurrentControlNotification(next);
relation.setNextControlNotification(null);
// Create a new task for sending next response so that we
// can leave the sync-block
ObserveLayer.super.sendResponse(exchange, next);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,11 +514,15 @@ private void retry() {
// Trigger MessageObservers
message.retransmitting();

// MessageObserver might have canceled
// MessageObserver might have canceled or completed
if (message.isCanceled()) {
LOGGER.trace("Timeout: for {}, {} got canceled, do not retransmit", exchange, message);
return;
}
if (exchange.isComplete()) {
LOGGER.debug("Timeout: for {}, {} got completed, do not retransmit", exchange, message);
return;
}
retransmit();
} else {
LOGGER.debug("Timeout: for {} retransmission limit reached, exchange failed, message: {}", exchange,
Expand Down

0 comments on commit d974921

Please sign in to comment.