From d9749219dd31206d6fa2aa730826ef326a1ad0ae Mon Sep 17 00:00:00 2001 From: Achim Kraus Date: Mon, 15 Mar 2021 13:17:36 +0100 Subject: [PATCH] Fix con-notify race-condition. Delegate onAcknowledgement to serial execution of exchange. Check, if current control notification is still the response the notification controller was created for. Signed-off-by: Achim Kraus --- .../core/network/stack/ObserveLayer.java | 82 +++++++++++-------- .../core/network/stack/ReliabilityLayer.java | 6 +- 2 files changed, 54 insertions(+), 34 deletions(-) diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/ObserveLayer.java b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/ObserveLayer.java index ad2a47c490..c6ce80f924 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/ObserveLayer.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/ObserveLayer.java @@ -167,7 +167,7 @@ public void receiveEmptyMessage(final Exchange exchange, final EmptyMessage mess } private void prepareSelfReplacement(Exchange exchange, Response response) { - response.addMessageObserver(new NotificationController(exchange)); + response.addMessageObserver(new NotificationController(exchange, response)); } /** @@ -176,52 +176,68 @@ private void prepareSelfReplacement(Exchange exchange, Response response) { private class NotificationController extends MessageObserverAdapter { private Exchange exchange; + private Response response; - public NotificationController(Exchange exchange) { + public NotificationController(Exchange exchange, Response response) { this.exchange = exchange; + this.response = response; } @Override public void onAcknowledgement() { - ObserveRelation relation = exchange.getRelation(); - final Response next = relation.getNextControlNotification(); - relation.setCurrentControlNotification(next); - // next may be null - relation.setNextControlNotification(null); - if (next != null) { - LOGGER.debug("notification has been acknowledged, send the next one"); - // Create a new task for sending next response so that we - // can leave the sync-block - exchange.execute(new Runnable() { - - @Override - public void run() { - ObserveLayer.super.sendResponse(exchange, next); + exchange.execute(new Runnable() { + + @Override + public void run() { + ObserveRelation relation = exchange.getRelation(); + if (relation.getCurrentControlNotification() == response) { + final Response next = relation.getNextControlNotification(); + // next may be null + relation.setCurrentControlNotification(next); + relation.setNextControlNotification(null); + if (next != null) { + if (relation.isCanceled()) { + next.cancel(); + } else { + LOGGER.trace("notification has been acknowledged, send the next one"); + ObserveLayer.super.sendResponse(exchange, next); + } + } } - }); - } + } + }); } @Override public void onRetransmission() { // called within the exchange executor context. ObserveRelation relation = exchange.getRelation(); - final Response next = relation.getNextControlNotification(); - if (next != null) { - LOGGER.debug("notification has timed out and there is a fresher notification for the retransmission"); - // Cancel the original retransmission and - // send the fresh notification here - exchange.getCurrentResponse().cancel(); - // Convert all notification retransmissions to CON - if (next.getType() != Type.CON) { - next.setType(Type.CON); - prepareSelfReplacement(exchange, next); + if (relation.getCurrentControlNotification() == response) { + Response next = relation.getNextControlNotification(); + if (relation.isCanceled()) { + response.cancel(); + if (next != null) { + next.cancel(); + next = null; + } + } + if (next != null) { + LOGGER.trace( + "notification has timed out and there is a fresher notification for the retransmission"); + // Cancel the original retransmission and + // send the fresh notification here + response.cancel(); + // Convert all notification retransmissions to CON + if (next.getType() != Type.CON) { + next.setType(Type.CON); + prepareSelfReplacement(exchange, next); + } + relation.setCurrentControlNotification(next); + relation.setNextControlNotification(null); + // Create a new task for sending next response so that we + // can leave the sync-block + ObserveLayer.super.sendResponse(exchange, next); } - relation.setCurrentControlNotification(next); - relation.setNextControlNotification(null); - // Create a new task for sending next response so that we - // can leave the sync-block - ObserveLayer.super.sendResponse(exchange, next); } } diff --git a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/ReliabilityLayer.java b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/ReliabilityLayer.java index 63939ffd1d..713cbbc2f9 100644 --- a/californium-core/src/main/java/org/eclipse/californium/core/network/stack/ReliabilityLayer.java +++ b/californium-core/src/main/java/org/eclipse/californium/core/network/stack/ReliabilityLayer.java @@ -514,11 +514,15 @@ private void retry() { // Trigger MessageObservers message.retransmitting(); - // MessageObserver might have canceled + // MessageObserver might have canceled or completed if (message.isCanceled()) { LOGGER.trace("Timeout: for {}, {} got canceled, do not retransmit", exchange, message); return; } + if (exchange.isComplete()) { + LOGGER.debug("Timeout: for {}, {} got completed, do not retransmit", exchange, message); + return; + } retransmit(); } else { LOGGER.debug("Timeout: for {} retransmission limit reached, exchange failed, message: {}", exchange,