From 2c1ba19dd410168ba8e78c6279efa3baff4be85f Mon Sep 17 00:00:00 2001 From: Manuel Sangoi Date: Wed, 4 Dec 2013 16:18:08 +0100 Subject: [PATCH] Fix transmission id in coap retry filter --- .../mina/coap/retry/CoapRetryFilter.java | 29 ++++++++++--------- .../mina/coap/retry/CoapTransmission.java | 20 ++++++++++++- .../mina/coap/retry/CoapRetryFilterTest.java | 5 ++-- .../mina/coap/retry/CoapTransmissionTest.java | 13 ++++++--- 4 files changed, 46 insertions(+), 21 deletions(-) diff --git a/coap/src/main/java/org/apache/mina/coap/retry/CoapRetryFilter.java b/coap/src/main/java/org/apache/mina/coap/retry/CoapRetryFilter.java index 374434d60..83084ad24 100644 --- a/coap/src/main/java/org/apache/mina/coap/retry/CoapRetryFilter.java +++ b/coap/src/main/java/org/apache/mina/coap/retry/CoapRetryFilter.java @@ -57,10 +57,10 @@ public class CoapRetryFilter extends AbstractIoFilter { private ScheduledExecutorService retryExecutor = Executors.newSingleThreadScheduledExecutor(); /** The confirmable messages waiting to be acknowledged */ - private Map inFlight = new ConcurrentHashMap<>(); + private Map inFlight = new ConcurrentHashMap<>(); /** The list of processed messages used to handle duplicate copies of Confirmable messages */ - private ExpiringMap processed = new ExpiringMap(retryExecutor); + private ExpiringMap processed = new ExpiringMap(retryExecutor); /** * {@inheritDoc} @@ -70,6 +70,7 @@ public void messageReceived(IoSession session, Object in, ReadFilterChainControl LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session); CoapMessage coapMsg = (CoapMessage) in; + String transmissionId = CoapTransmission.uniqueId(session, coapMsg); switch (coapMsg.getType()) { case NON_CONFIRMABLE: @@ -78,10 +79,10 @@ public void messageReceived(IoSession session, Object in, ReadFilterChainControl break; case CONFIRMABLE: // check if this is a duplicate of a message already processed - CoapMessage ack = processed.get(coapMsg.requestId()); + CoapMessage ack = processed.get(transmissionId); if (ack != null) { // stop the filter chain and send again the ack since it was probably lost - LOGGER.debug("Duplicated messages detected for ID {}", coapMsg.requestId()); + LOGGER.debug("Duplicated messages detected with ID {} in session {}", coapMsg.requestId(), session); controller.callWriteMessageForRead(ack); } else { controller.callReadNextFilter(coapMsg); @@ -90,11 +91,11 @@ public void messageReceived(IoSession session, Object in, ReadFilterChainControl break; case ACK: case RESET: - CoapTransmission t = inFlight.get(coapMsg.requestId()); + CoapTransmission t = inFlight.get(transmissionId); if (t != null) { // cancel the scheduled retransmission t.getRetryFuture().cancel(false); - inFlight.remove(coapMsg.requestId()); + inFlight.remove(transmissionId); } controller.callReadNextFilter(coapMsg); break; @@ -110,7 +111,7 @@ public void messageWriting(final IoSession session, final WriteRequest message, LOGGER.debug("Processing a MESSAGE_WRITING for session {}", session); final CoapMessage coapMsg = (CoapMessage) message.getMessage(); - final Integer coapMsgId = (Integer) coapMsg.requestId(); + final String transmissionId = CoapTransmission.uniqueId(session, coapMsg); switch (coapMsg.getType()) { @@ -120,17 +121,17 @@ public void messageWriting(final IoSession session, final WriteRequest message, case RESET: case ACK: // let's keep track of the message to avoid processing it again in case of duplicate copy. - processed.put(coapMsgId, coapMsg); + processed.put(transmissionId, coapMsg); controller.callWriteNextFilter(message); break; case CONFIRMABLE: // initialize a transmission if this is not a retry - CoapTransmission t = inFlight.get(coapMsgId); + CoapTransmission t = inFlight.get(transmissionId); if (t == null) { - t = new CoapTransmission(coapMsg); - inFlight.put(coapMsgId, t); + t = new CoapTransmission(session, coapMsg); + inFlight.put(t.getId(), t); } // schedule a retry @@ -138,15 +139,15 @@ public void messageWriting(final IoSession session, final WriteRequest message, @Override public void run() { - CoapTransmission t = inFlight.get(coapMsgId); + CoapTransmission t = inFlight.get(transmissionId); // send again the message if the maximum number of attempts is not reached if (t != null && t.timeout()) { - LOGGER.debug("Retry for message with ID {}", coapMsgId); + LOGGER.debug("Retry for message with ID {}", coapMsg.requestId()); session.write(coapMsg); } else { // abort transmission - LOGGER.debug("No more retry for message with ID {}", coapMsgId); + LOGGER.debug("No more retry for message with ID {}", coapMsg.requestId()); } } }, t.getNextTimeout(), TimeUnit.MILLISECONDS); diff --git a/coap/src/main/java/org/apache/mina/coap/retry/CoapTransmission.java b/coap/src/main/java/org/apache/mina/coap/retry/CoapTransmission.java index 6f1359c30..54516256b 100644 --- a/coap/src/main/java/org/apache/mina/coap/retry/CoapTransmission.java +++ b/coap/src/main/java/org/apache/mina/coap/retry/CoapTransmission.java @@ -22,6 +22,7 @@ import java.util.Random; import java.util.concurrent.ScheduledFuture; +import org.apache.mina.api.IoSession; import org.apache.mina.coap.CoapMessage; /** @@ -43,6 +44,11 @@ public class CoapTransmission { /** Default value of the maximum number of retransmissions */ private static final int MAX_RETRANSMIT = 4; + /** + * The unique transmission identifier + */ + private String id; + /** * The CoAP message waiting to be acknowledged */ @@ -64,7 +70,8 @@ public class CoapTransmission { */ private long nextTimeout; - public CoapTransmission(CoapMessage message) { + public CoapTransmission(IoSession session, CoapMessage message) { + this.id = uniqueId(session, message); this.message = message; this.transmissionCount = 0; @@ -88,6 +95,10 @@ public boolean timeout() { return false; } + public String getId() { + return id; + } + public CoapMessage getMessage() { return message; } @@ -104,4 +115,11 @@ public long getNextTimeout() { return nextTimeout; } + /** + * @return the unique identifier for a given message in a session. + */ + public static String uniqueId(IoSession session, CoapMessage message) { + return session.getId() + "#" + message.requestId(); + } + } diff --git a/coap/src/test/java/org/apache/mina/coap/retry/CoapRetryFilterTest.java b/coap/src/test/java/org/apache/mina/coap/retry/CoapRetryFilterTest.java index 34c0e3671..6c046ec62 100644 --- a/coap/src/test/java/org/apache/mina/coap/retry/CoapRetryFilterTest.java +++ b/coap/src/test/java/org/apache/mina/coap/retry/CoapRetryFilterTest.java @@ -19,6 +19,7 @@ */ package org.apache.mina.coap.retry; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; import org.apache.mina.api.IoSession; @@ -130,7 +131,7 @@ public void no_retry_if_ack_received() throws InterruptedException { verify(writeController).callWriteNextFilter(writeRequest); // no retry - Mockito.verifyZeroInteractions(session); + verify(session, Mockito.never()).write(any(CoapMessage.class)); } @Test @@ -152,7 +153,7 @@ public void no_retry_if_reset_received() throws InterruptedException { verify(writeController).callWriteNextFilter(writeRequest); // no retry - Mockito.verifyZeroInteractions(session); + verify(session, Mockito.never()).write(any(CoapMessage.class)); } @Test diff --git a/coap/src/test/java/org/apache/mina/coap/retry/CoapTransmissionTest.java b/coap/src/test/java/org/apache/mina/coap/retry/CoapTransmissionTest.java index 1280885da..c31cc4853 100644 --- a/coap/src/test/java/org/apache/mina/coap/retry/CoapTransmissionTest.java +++ b/coap/src/test/java/org/apache/mina/coap/retry/CoapTransmissionTest.java @@ -19,12 +19,13 @@ */ package org.apache.mina.coap.retry; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; +import org.apache.mina.api.IoSession; import org.apache.mina.coap.CoapMessage; import org.apache.mina.coap.MessageType; import org.junit.Test; +import org.mockito.Mockito; /** * Unit tests for {@link CoapTransmission} @@ -36,8 +37,12 @@ public class CoapTransmissionTest { @Test public void timeout() { - CoapTransmission transmission = new CoapTransmission(new CoapMessage(1, MessageType.CONFIRMABLE, 1, 1234, - "token".getBytes(), null, "payload".getBytes())); + + IoSession session = Mockito.mock(IoSession.class); + Mockito.when(session.getId()).thenReturn(1L); + + CoapTransmission transmission = new CoapTransmission(session, new CoapMessage(1, MessageType.CONFIRMABLE, 1, + 1234, "token".getBytes(), null, "payload".getBytes())); assertTrue(transmission.getNextTimeout() > MIN_INIT_TIMEOUT); assertTrue(transmission.getNextTimeout() < MAX_INIT_TIMEOUT);