Permalink
Browse files

Synchronized accesses to the sentMessages queue, to hopefully make re…

…transmits more understandable.
  • Loading branch information...
willscott committed Apr 17, 2012
1 parent d744134 commit 969e476dfeef5ae3639d71539f75e7be9ec56e14
@@ -39,7 +39,7 @@ public void addChannel(ServiceChannelEndpoint s) {
this.channelOutstanding.put(s.getChannelId(), new HashSet<SequenceNumber>());
}
- public void onAck(OSF2FServiceDataMsg message) {
+ public int onAck(OSF2FServiceDataMsg message) {
// Parse acknowledged messages
DirectByteBuffer payload = message.getPayload();
HashSet<SequenceNumber> numbers = new HashSet<SequenceNumber>();
@@ -75,6 +75,7 @@ public void onAck(OSF2FServiceDataMsg message) {
for (Integer num : retransmissions) {
logger.info("Non outstanding packet acked: " + num);
}
+ return numbers.size();
}
public SequenceNumber nextMsg() {
@@ -202,7 +202,9 @@ private void writeMessage(final SequenceNumber num, DirectByteBuffer buffer, int
int length = buffer.remaining(ss);
ReferenceCountedDirectByteBuffer copy = buffer.getReferenceCountedBuffer();
sentMessage sent = new sentMessage(num, copy, length, attempt, datagram);
- this.sentMessages.put(num, sent);
+ synchronized (sentMessages) {
+ this.sentMessages.put(num, sent);
+ }
this.outstandingBytes += length;
OSF2FServiceDataMsg msg = new OSF2FServiceDataMsg(OSF2FMessage.CURRENT_VERSION, channelId,
num.getNum(), num.getFlow(), new int[0], copy);
@@ -248,11 +250,13 @@ public long getLatency() {
}
public DirectByteBuffer getMessage(SequenceNumber num) {
- sentMessage m = this.sentMessages.get(num);
- if (m != null) {
- return m.msg;
+ synchronized (sentMessages) {
+ sentMessage m = this.sentMessages.get(num);
+ if (m != null) {
+ return m.msg;
+ }
+ return null;
}
- return null;
}
/**
@@ -263,26 +267,28 @@ public DirectByteBuffer getMessage(SequenceNumber num) {
* @return True if the message was successfully stopped from retransmitting.
*/
public boolean forgetMessage(SequenceNumber num) {
- sentMessage msg = this.sentMessages.remove(num);
- if (msg == null) {
- return false;
- }
- msg.cancel();
- this.outstandingBytes -= msg.length;
- long now = System.currentTimeMillis();
- long sample = now - msg.creation;
- // If not the first attempt, we don't know which attempt was acked.
- if (msg.attempt == 0) {
- this.latency = (long) (this.latency * (1 - EWMA) + sample * EWMA);
- if (sample < minLatency) {
- minLatency = sample;
+ synchronized (sentMessages) {
+ sentMessage msg = this.sentMessages.remove(num);
+ if (msg == null) {
+ return false;
}
+ msg.cancel();
+ this.outstandingBytes -= msg.length;
+ long now = System.currentTimeMillis();
+ long sample = now - msg.creation;
+ // If not the first attempt, we don't know which attempt was acked.
+ if (msg.attempt == 0) {
+ this.latency = (long) (this.latency * (1 - EWMA) + sample * EWMA);
+ if (sample < minLatency) {
+ minLatency = sample;
+ }
- // Pending messages sent before this one were probably lost
- sentMessage[] messages = this.sentMessages.values().toArray(new sentMessage[0]);
- for (sentMessage m : messages) {
- if (m.creation < msg.creation) {
- m.run();
+ // Pending messages sent before this one were potentially lost
+ sentMessage[] messages = this.sentMessages.values().toArray(new sentMessage[0]);
+ for (sentMessage m : messages) {
+ if (m.creation < msg.creation) {
+ m.run();
+ }
}
}
}
@@ -318,16 +324,20 @@ public sentMessage(SequenceNumber num, ReferenceCountedDirectByteBuffer msg, int
@Override
public void run() {
- sentMessage self = sentMessages.remove(num);
- if (self != null && !closed) {
- if (self.attempt == attempt) {
- logger.fine(num + " was retransmitted.");
- outstandingBytes -= length;
- msg.position(ss, position);
- writeMessage(num, msg, attempt + 1, datagram);
- } else {
+ synchronized (sentMessages) {
+ sentMessage self = sentMessages.remove(num);
+ if (self == null || closed) {
+ return;
+ }
+ if (self.attempt != attempt) {
+ logger.warning("Message queue concurency issues");
sentMessages.put(num, self);
}
+
+ logger.fine("retransmitting " + num + ", try " + attempt);
+ outstandingBytes -= length;
+ msg.position(ss, position);
+ writeMessage(num, msg, attempt + 1, datagram);
}
}
@@ -287,8 +287,7 @@ public boolean channelGotMessage(ServiceChannelEndpoint sender, OSF2FServiceData
if (msg.isAck()) {
logger.fine("Acked msg " + msg.getSequenceNumber());
- mmt.onAck(msg);
- windowSize += 1;
+ windowSize += mmt.onAck(msg);
return true;
}

0 comments on commit 969e476

Please sign in to comment.