Skip to content
Browse files

Cleaned up the sent-message queue, and simplified retransmit logic.

  • Loading branch information...
1 parent c504d0b commit a5b9fef6571841ee21ee2b79350b680c5fc2ed4d @willscott willscott committed Apr 18, 2012
View
73 oneswarm_f2f/src/edu/washington/cs/oneswarm/f2f/servicesharing/ServiceChannelEndpoint.java
@@ -189,53 +189,67 @@ public long getServiceKey() {
}
public void writeMessage(final SequenceNumber num, DirectByteBuffer buffer, boolean datagram) {
+ // Move the requester to the bottom of the priority list.
try {
this.delegateOrder.remove(num.getFlow());
} catch (IndexOutOfBoundsException e) {
return;
}
this.delegateOrder.add(num.getFlow());
- writeMessage(num, buffer, 0, datagram);
- }
- private void writeMessage(final SequenceNumber num, DirectByteBuffer buffer, int attempt,
- boolean datagram) {
boolean rst = buffer == null;
if (buffer == null) {
buffer = new DirectByteBuffer(ByteBuffer.allocate(0));
}
+
int length = buffer.remaining(ss);
- ReferenceCountedDirectByteBuffer copy = buffer.getReferenceCountedBuffer();
- sentMessage sent = new sentMessage(num, copy, length, attempt, datagram);
+ ReferenceCountedDirectByteBuffer cpy = buffer.getReferenceCountedBuffer();
+ sentMessage msg = new sentMessage(num, cpy, length, 0, datagram, rst);
+
+ writeMessage(msg);
+ }
+
+ private void writeMessage(sentMessage msg) {
+ SequenceNumber num = msg.num;
synchronized (sentMessages) {
- this.sentMessages.put(num, sent);
+ this.sentMessages.put(num, msg);
}
- this.outstandingBytes += length;
- OSF2FServiceDataMsg msg = new OSF2FServiceDataMsg(OSF2FMessage.CURRENT_VERSION, channelId,
- num.getNum(), num.getFlow(), new int[0], copy);
- if (num.getNum() == 0 && !rst) {
+ this.outstandingBytes += msg.length;
+
+ double retransmit = RETRANSMISSION_MIN + (RETRANSMISSION_MAX - RETRANSMISSION_MIN)
+ * Math.random();
+ // Remember the message may need to be retransmitted.
+ delayedExecutor.queue((long) (retransmit * this.latency * (1 << msg.attempt)), msg);
+
+ if (msg.creation + latency < System.currentTimeMillis()) {
+ logger.warning("Skipping over-aggresive retransmission.");
+ return;
+ }
+ msg.creation = System.currentTimeMillis();
+
+ // Outgoing msg will be freed by super.writeMessage.
+ msg.msg.incrementReferenceCount();
+ OSF2FServiceDataMsg outgoing = new OSF2FServiceDataMsg(OSF2FMessage.CURRENT_VERSION,
+ channelId, num.getNum(), num.getFlow(), new int[0], msg.msg);
+
+ if (num.getNum() == 0 && !msg.rst) {
// Mark SYN messages.
- msg.setControlFlag(4);
+ outgoing.setControlFlag(4);
}
- if (rst) {
- msg.setControlFlag(2);
+ if (msg.rst) {
+ outgoing.setControlFlag(2);
}
- if (datagram) {
+ if (msg.datagram) {
// Set datagram flag to allow the packet to be sent over UDP.
- msg.setDatagram(true);
+ outgoing.setDatagram(true);
}
- long totalWritten = buffer.remaining(DirectByteBuffer.SS_MSG);
+ long totalWritten = msg.length;
if (logger.isLoggable(Level.FINEST)) {
- logger.finest(String.format("Wrote %s to network. bytes: %d", num, length));
+ logger.finest(String.format("Wrote %s to network. bytes: %d", num, msg.length));
}
- super.writeMessage(msg);
+ super.writeMessage(outgoing);
bytesOut += totalWritten;
-
- double retransmit = RETRANSMISSION_MIN + (RETRANSMISSION_MAX - RETRANSMISSION_MIN)
- * Math.random();
- // Remember the message may need to be retransmitted.
- delayedExecutor.queue((long) (retransmit * this.latency * (1 << attempt)), sent);
}
public int getOutstanding() {
@@ -314,9 +328,10 @@ protected boolean isService() {
private final SequenceNumber num;
private final int attempt;
private final boolean datagram;
+ public final boolean rst;
public sentMessage(SequenceNumber num, ReferenceCountedDirectByteBuffer msg, int length,
- int attempt, boolean datagram) {
+ int attempt, boolean datagram, boolean rst) {
this.creation = System.currentTimeMillis();
this.msg = msg;
this.position = msg.position(ss);
@@ -325,6 +340,7 @@ public sentMessage(SequenceNumber num, ReferenceCountedDirectByteBuffer msg, int
this.num = num;
this.attempt = attempt;
this.datagram = datagram;
+ this.rst = rst;
}
@Override
@@ -339,16 +355,11 @@ public void run() {
sentMessages.put(num, self);
return;
}
- if (self.creation + latency < System.currentTimeMillis()) {
- logger.warning("Holding off on agressive retransmission request.");
- sentMessages.put(num, self);
- return;
- }
logger.fine("retransmitting " + num + ", try " + attempt);
outstandingBytes -= length;
msg.position(ss, position);
- writeMessage(num, msg, attempt + 1, datagram);
+ writeMessage(this);
}
}

0 comments on commit a5b9fef

Please sign in to comment.
Something went wrong with that request. Please try again.