Skip to content
Browse files

- Added state to connection entry, going to CLOSING state before remo…

  • Loading branch information...
1 parent 2be3a14 commit 49aa197a3182819aa6e094167128fc631d112d3e @belaban committed Mar 12, 2013
View
54 doc/design/UNICAST3.txt
@@ -81,6 +81,60 @@ time until a message is resent by the sender.
Connection establishment
------------------------
+Instead of adding explicit connection establishment and teardown, we'll add states OPEN, CLOSING and CLOSED to a
+connection entry (SenderEntry, ReceiverEntry).
+
+When a connection is closed, it won't get removed immediately, but its state will be set to CLOSING.
+The retransmitter will flush unacked messages in the sender entry in this state.
+After a few minutes and no activity, the state will be set to CLOSED and the entry removed. This is also done by the
+retransmission task.
+
+The advantage of a time lag between CLOSING and setting a connection to CLOSED (and removing it) is that when a message
+is sent on a CLOSING connection, it will be reverted to OPEN again. When there wasn't a message sent during the time
+lag, then chances are the connection is not used any longer and can safely be removed. The time lag is configurable
+(conn_expiry_timeout).
+
+The design is below:
+
+
+SENDER:
+Closing a sender connection entry:
+- The state is set to CLOSING
+- The retransmission task will keep retransmitting if HA < HS
+
+Sending a message:
+- If the state is CLOSING: set it to OPEN, reset timestamp
+- If the state is CLOSED (should almost never happen, and the entry should be removed a few ms (max) later):
+ - Go to the top of the loop and fetch the sender entry again (or create a new connection entry)
+ - Actually, this is not implemented, as it's an edge case and having to use locking would increase complexity
+
+
+RECEIVER:
+Reception of a CLOSE message (conn-id must be the same)
+- Set the state of the receiver entry (if found) to CLOSED
+- Remove the receiver entry
+
+Reception of a message:
+- In addition to a null receiver entry, we also check for a CLOSED connection entry (counts the same as a null entry)
+- If we have a message batch, we check if there is a message with first==true. If so, we add that message first,
+ then the rest as a mass-insertion into Table. Else, we insert all messages via mass-insertion.
+
+Sending of SEND_FIRST_SEQNO message
+- We don't send this message immediately, but set a flag in the receiver entry
+- When the retransmission task kicks in, it clears the flag and sends a SEND_FIRST_SEQNO message
+- On reception of this message, the sender only sends the first message; the rest will get retransmitted by the sender
+ anyway, or the receiver will ask for retransmission
+
+
+
+SENDER and RECEIVER:
+Retransmission task (reaping)
+- If the state is OPEN and the entry has expired and connection reaping is enabled (conn-expiry-timeout > 0):
+ - Update the timestamp
+ - Set the state to CLOSING
+- If the state of an entry is CLOSING and conn-close-timeout has expired:
+ - Set the state to CLOSED and remove the entry from the send-table
+ - [if SENDER] Send a CLOSE message to the target destination
View
448 src/org/jgroups/protocols/UNICAST3.java
@@ -16,7 +16,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -34,28 +33,31 @@
@Property(description="Max number of messages to be removed from a retransmit window. This property might " +
"get removed anytime, so don't use it !")
- protected int max_msg_batch_size=500;
+ protected int max_msg_batch_size=500;
@Property(description="Time (in milliseconds) after which an idle incoming or outgoing connection is closed. The " +
"connection will get re-established when used again. 0 disables connection reaping")
- protected long conn_expiry_timeout=0;
+ protected long conn_expiry_timeout=60000 * 2;
+
+ @Property(description="Time (in ms) until a connection marked to be closed will get removed. 0 disables this")
+ protected long conn_close_timeout=60000;
@Property(description="Number of rows of the matrix in the retransmission table (only for experts)",writable=false)
- protected int xmit_table_num_rows=100;
+ protected int xmit_table_num_rows=100;
@Property(description="Number of elements of a row of the matrix in the retransmission table (only for experts). " +
"The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row",writable=false)
- protected int xmit_table_msgs_per_row=1000;
+ protected int xmit_table_msgs_per_row=1000;
@Property(description="Resize factor of the matrix in the retransmission table (only for experts)",writable=false)
- protected double xmit_table_resize_factor=1.2;
+ protected double xmit_table_resize_factor=1.2;
@Property(description="Number of milliseconds after which the matrix in the retransmission table " +
"is compacted (only for experts)",writable=false)
- protected long xmit_table_max_compaction_time=10 * 60 * 1000;
+ protected long xmit_table_max_compaction_time=10 * 60 * 1000;
@Property(description="Interval (in milliseconds) at which messages in the send windows are resent")
- protected long xmit_interval=500;
+ protected long xmit_interval=500;
@Property(description="If true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)")
protected boolean log_not_found_msgs=true;
@@ -343,15 +345,12 @@ public void start() throws Exception {
if(max_retransmit_time > 0)
cache=new AgeOutCache<Address>(timer, max_retransmit_time, this);
running=true;
- if(conn_expiry_timeout > 0)
- startConnectionReaper();
startRetransmitTask();
}
public void stop() {
running=false;
stopRetransmitTask();
- stopConnectionReaper();
xmit_task_map.clear();
removeAllConnections();
}
@@ -390,11 +389,21 @@ protected void handleUpEvent(Address sender, Message msg, Header hdr) {
handleAckReceived(sender, hdr.seqno, hdr.conn_id);
break;
case Header.SEND_FIRST_SEQNO:
- handleResendingOfFirstMessage(sender, hdr.seqno);
+ handleResendingOfFirstMessage(sender);
break;
case Header.XMIT_REQ: // received ACK for previously sent message
handleXmitRequest(sender, (SeqnoList)msg.getObject());
break;
+ case Header.CLOSE:
+ if(log.isTraceEnabled())
+ log.trace(local_addr + " <-- CLOSE(" + sender + ": conn-id=" + hdr.conn_id + ")");
+ ReceiverEntry entry=recv_table.get(sender);
+ if(entry != null && entry.connId() == hdr.conn_id) {
+ recv_table.remove(sender);
+ if(log.isTraceEnabled())
+ log.trace(local_addr + ": removed receive connection for " + sender);
+ }
+ break;
default:
log.error("UnicastHeader type " + hdr.type + " not known !");
break;
@@ -408,8 +417,10 @@ public void up(MessageBatch batch) {
return;
}
- int size=batch.size();
- Map<Short,List<Message>> msgs=new TreeMap<Short,List<Message>>(); // map of messages, keyed by conn-id
+ int size=batch.size();
+ Map<Short,List<Tuple<Long,Message>>> msgs=new LinkedHashMap<Short,List<Tuple<Long,Message>>>();
+ ReceiverEntry entry=recv_table.get(batch.sender());
+
for(Message msg: batch) {
if(msg == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY))
continue;
@@ -428,14 +439,27 @@ public void up(MessageBatch batch) {
continue;
}
- List<Message> list=msgs.get(hdr.conn_id);
+ List<Tuple<Long,Message>> list=msgs.get(hdr.conn_id);
if(list == null)
- msgs.put(hdr.conn_id, list=new ArrayList<Message>(size));
- list.add(msg);
+ msgs.put(hdr.conn_id, list=new ArrayList<Tuple<Long,Message>>(size));
+ list.add(new Tuple<Long,Message>(hdr.seqno(), msg));
+
+ if(hdr.first)
+ entry=getReceiverEntry(batch.sender(), hdr.seqno(), hdr.first, hdr.connId());
+ }
+
+ if(!msgs.isEmpty()) {
+ if(entry == null)
+ sendRequestForFirstSeqno(batch.sender());
+ else {
+ if(msgs.keySet().retainAll(Arrays.asList(entry.connId()))) // remove all conn-ids that don't match
+ sendRequestForFirstSeqno(batch.sender());
+ List<Tuple<Long,Message>> list=msgs.get(entry.connId());
+ if(list != null && !list.isEmpty())
+ handleBatchReceived(entry, batch.sender(), list, batch.mode() == MessageBatch.Mode.OOB);
+ }
}
- if(!msgs.isEmpty())
- handleBatchReceived(batch.sender(), msgs); // process msgs:
if(!batch.isEmpty())
up_prot.up(batch);
}
@@ -460,20 +484,25 @@ public Object down(Event evt) {
}
SenderEntry entry=send_table.get(dst);
- if(entry == null) {
+ if(entry == null || entry.state() == State.CLOSED) {
+ if(entry != null)
+ send_table.remove(dst);
entry=new SenderEntry(getNewConnectionId());
SenderEntry existing=send_table.putIfAbsent(dst, entry);
if(existing != null)
entry=existing;
else {
if(log.isTraceEnabled())
- log.trace(local_addr + ": created sender window for " + dst + " (conn-id=" + entry.send_conn_id + ")");
+ log.trace(local_addr + ": created sender window for " + dst + " (conn-id=" + entry.connId() + ")");
if(cache != null && !members.contains(dst))
cache.add(dst);
}
}
- short send_conn_id=entry.send_conn_id;
+ if(entry.state() == State.CLOSING)
+ entry.state(State.OPEN);
+
+ short send_conn_id=entry.connId();
long seqno=entry.sent_msgs_seqno.getAndIncrement();
long sleep=10;
while(running) {
@@ -519,7 +548,17 @@ public Object down(Event evt) {
if(log.isTraceEnabled())
log.trace(local_addr + ": removing non members " + non_members);
for(Address non_mbr: non_members)
- removeConnection(non_mbr);
+ closeConnection(non_mbr);
+ }
+ if(!new_members.isEmpty()) {
+ for(Address mbr: new_members) {
+ Entry e=send_table.get(mbr);
+ if(e != null && e.state() == State.CLOSING)
+ e.state(State.OPEN);
+ e=recv_table.get(mbr);
+ if(e != null && e.state() == State.CLOSING)
+ e.state(State.OPEN);
+ }
}
xmit_task_map.keySet().retainAll(view.getMembers());
break;
@@ -539,17 +578,35 @@ public Object down(Event evt) {
* otherwise false. This method is public only so it can be invoked by unit testing, but should not otherwise be
* used !
*/
- public void removeConnection(Address mbr) {
- removeSendConnection(mbr);
- removeReceiveConnection(mbr);
+ public void closeConnection(Address mbr) {
+ closeSendConnection(mbr);
+ closeReceiveConnection(mbr);
}
- public void removeSendConnection(Address mbr) {
- send_table.remove(mbr);
+ public void closeSendConnection(Address mbr) {
+ SenderEntry entry=send_table.get(mbr);
+ if(entry != null)
+ entry.state(State.CLOSING).update();
}
- public void removeReceiveConnection(Address mbr) {
- recv_table.remove(mbr);
+ protected void removeSendConnection(Address mbr) {
+ SenderEntry entry=send_table.remove(mbr);
+ if(entry != null) {
+ entry.state(State.CLOSED);
+ sendClose(mbr, entry.connId());
+ }
+ }
+
+ public void closeReceiveConnection(Address mbr) {
+ ReceiverEntry entry=recv_table.get(mbr);
+ if(entry != null)
+ entry.state(State.CLOSING).update();
+ }
+
+ protected void removeReceiveConnection(Address mbr) {
+ ReceiverEntry entry=recv_table.remove(mbr);
+ if(entry != null)
+ entry.state(State.CLOSED);
}
@@ -592,7 +649,7 @@ public void expired(Address key) {
if(key != null) {
if(log.isDebugEnabled())
log.debug("removing connection to " + key + " because it expired");
- removeConnection(key);
+ closeConnection(key);
}
}
@@ -617,18 +674,25 @@ protected void handleDataReceived(Address sender, long seqno, short conn_id, bo
return;
if(conn_expiry_timeout > 0)
entry.update();
+ if(entry.state() == State.CLOSING)
+ entry.state(State.OPEN);
Table<Message> win=entry.received_msgs;
boolean added=win.add(seqno, msg); // win is guaranteed to be non-null if we get here
num_msgs_received++;
// An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering !
// http://jira.jboss.com/jira/browse/JGRP-377
if(msg.isFlagSet(Message.Flag.OOB) && added) {
- try {
- up_prot.up(evt);
- }
- catch(Throwable t) {
- log.error("couldn't deliver OOB message " + msg, t);
+ if(msg.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) {
+ if(log.isTraceEnabled())
+ log.trace(new StringBuilder().append(local_addr).append(": delivering ")
+ .append(sender).append('#').append(seqno));
+ try {
+ up_prot.up(evt);
+ }
+ catch(Throwable t) {
+ log.error("failed to deliver OOB message " + msg, t);
+ }
}
}
@@ -641,56 +705,48 @@ protected void handleDataReceived(Address sender, long seqno, short conn_id, bo
- protected void handleBatchReceived(Address sender, Map<Short,List<Message>> map) {
- for(Map.Entry<Short,List<Message>> element: map.entrySet()) {
- final List<Message> msg_list=element.getValue();
- if(log.isTraceEnabled()) {
- StringBuilder sb=new StringBuilder();
- sb.append(local_addr).append(" <-- DATA(").append(sender).append(": " + printMessageList(msg_list)).append(')');
- log.trace(sb);
- }
-
- short conn_id=element.getKey();
- ReceiverEntry entry=null;
- for(Message msg: msg_list) {
- Header hdr=(Header)msg.getHeader(id);
- entry=getReceiverEntry(sender, hdr.seqno, hdr.first, conn_id);
- if(entry == null)
- continue;
- Table<Message> win=entry.received_msgs;
- boolean msg_added=win.add(hdr.seqno, msg); // win is guaranteed to be non-null if we get here
- num_msgs_received++;
+ protected void handleBatchReceived(final ReceiverEntry entry, Address sender, List<Tuple<Long,Message>> msgs, boolean oob) {
+ if(log.isTraceEnabled()) {
+ StringBuilder sb=new StringBuilder();
+ sb.append(local_addr).append(" <-- DATA(").append(sender).append(": " + printMessageList(msgs)).append(')');
+ log.trace(sb);
+ }
- if(hdr.first && msg_added)
- sendAck(sender, hdr.seqno, conn_id); // send an ack immediately when we received the first message of a conn
+ Table<Message> win=entry.received_msgs;
+ num_msgs_received+=msgs.size();
+ boolean added=oob ? win.add(msgs, true) : win.add(msgs);
- // An OOB message is passed up immediately. Later, when remove() is called, we discard it. This affects ordering !
- // http://jira.jboss.com/jira/browse/JGRP-377
- if(msg.isFlagSet(Message.Flag.OOB) && msg_added) {
+ if(conn_expiry_timeout > 0)
+ entry.update();
+ if(entry.state() == State.CLOSING)
+ entry.state(State.OPEN);
+
+ // OOB msg is passed up. When removed, we discard it. Affects ordering: http://jira.jboss.com/jira/browse/JGRP-379
+ if(added && oob) {
+ for(Tuple<Long,Message> tuple: msgs) {
+ long seq=tuple.getVal1();
+ Message msg=tuple.getVal2();
+ if(msg.isFlagSet(Message.Flag.OOB) && msg.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) {
+ if(log.isTraceEnabled())
+ log.trace(new StringBuilder().append(local_addr).append(": delivering ")
+ .append(sender).append('#').append(seq));
try {
up_prot.up(new Event(Event.MSG, msg));
}
catch(Throwable t) {
- log.error("couldn't deliver OOB message " + msg, t);
+ log.error("failed to deliver OOB message " + msg, t);
}
}
}
- if(entry != null && conn_expiry_timeout > 0)
- entry.update();
}
- ReceiverEntry entry=recv_table.get(sender);
- Table<Message> win=entry != null? entry.received_msgs : null;
- if(win != null) {
- final AtomicBoolean processing=win.getProcessing();
- if(processing.compareAndSet(false, true)) {
- if(ack_batches_immediately)
- sendAck(sender, win.getHighestDeliverable(), entry.recv_conn_id);
- else
- entry.sendAck(true);
-
- removeAndDeliver(processing, win, sender);
- }
+ final AtomicBoolean processing=win.getProcessing();
+ if(processing.compareAndSet(false, true)) {
+ if(ack_batches_immediately)
+ sendAck(sender, win.getHighestDeliverable(), entry.connId());
+ else
+ entry.sendAck(true);
+ removeAndDeliver(processing, win, sender);
}
}
@@ -715,11 +771,10 @@ protected int removeAndDeliver(final AtomicBoolean processing, Table<Message> wi
return retval;
}
-
MessageBatch batch=new MessageBatch(local_addr, sender, null, false, list);
for(Message msg_to_deliver: batch) {
// discard OOB msg: it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-377)
- if(msg_to_deliver.isFlagSet(Message.Flag.OOB))
+ if(msg_to_deliver.isFlagSet(Message.Flag.OOB) && !msg_to_deliver.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED))
batch.remove(msg_to_deliver);
}
if(batch.isEmpty())
@@ -752,10 +807,10 @@ protected int removeAndDeliver(final AtomicBoolean processing, Table<Message> wi
}
- protected String printMessageList(List<Message> list) {
+ protected String printMessageList(List<Tuple<Long,Message>> list) {
StringBuilder sb=new StringBuilder();
int size=list.size();
- Message first=size > 0? list.get(0) : null, second=size > 1? list.get(size-1) : first;
+ Message first=size > 0? list.get(0).getVal2() : null, second=size > 1? list.get(size-1).getVal2() : first;
Header hdr;
if(first != null) {
hdr=(Header)first.getHeader(id);
@@ -772,33 +827,30 @@ protected String printMessageList(List<Message> list) {
protected ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean first, short conn_id) {
ReceiverEntry entry=recv_table.get(sender);
- if(entry != null && entry.recv_conn_id == conn_id)
+ if(entry != null && entry.connId() == conn_id)
return entry;
recv_table_lock.lock();
try {
entry=recv_table.get(sender);
if(first) {
if(entry == null) {
- entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
+ entry=createReceiverEntry(sender,seqno,conn_id);
}
else { // entry != null && win != null
- if(conn_id != entry.recv_conn_id) {
+ if(conn_id != entry.connId()) {
if(log.isTraceEnabled())
- log.trace(local_addr + ": conn_id=" + conn_id + " != " + entry.recv_conn_id + "; resetting receiver window");
+ log.trace(local_addr + ": conn_id=" + conn_id + " != " + entry.connId() + "; resetting receiver window");
recv_table.remove(sender);
- entry=getOrCreateReceiverEntry(sender, seqno, conn_id);
- }
- else {
- ;
+ entry=createReceiverEntry(sender,seqno,conn_id);
}
}
}
else { // entry == null && win == null OR entry != null && win == null OR entry != null && win != null
- if(entry == null || entry.recv_conn_id != conn_id) {
+ if(entry == null || entry.connId() != conn_id) {
recv_table_lock.unlock();
- sendRequestForFirstSeqno(sender, seqno); // drops the message and returns (see below)
+ sendRequestForFirstSeqno(sender); // drops the message and returns (see below)
return null;
}
}
@@ -811,7 +863,7 @@ protected ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean fir
}
- protected ReceiverEntry getOrCreateReceiverEntry(Address sender, long seqno, short conn_id) {
+ protected ReceiverEntry createReceiverEntry(Address sender, long seqno, short conn_id) {
Table<Message> table=new Table<Message>(xmit_table_num_rows, xmit_table_msgs_per_row, seqno-1,
xmit_table_resize_factor, xmit_table_max_compaction_time);
ReceiverEntry entry=new ReceiverEntry(table, conn_id);
@@ -830,9 +882,9 @@ protected void handleAckReceived(Address sender, long seqno, short conn_id) {
append(": #").append(seqno).append(", conn-id=").append(conn_id).append(')'));
SenderEntry entry=send_table.get(sender);
- if(entry != null && entry.send_conn_id != conn_id) {
+ if(entry != null && entry.connId() != conn_id) {
if(log.isTraceEnabled())
- log.trace(local_addr + ": my conn_id (" + entry.send_conn_id +
+ log.trace(local_addr + ": my conn_id (" + entry.connId() +
") != received conn_id (" + conn_id + "); discarding ACK");
return;
}
@@ -845,15 +897,14 @@ protected void handleAckReceived(Address sender, long seqno, short conn_id) {
}
-
+
/**
- * We need to resend our first message with our conn_id
+ * We need to resend the first message with our conn_id
* @param sender
- * @param seqno Resend the non null messages in the range [lowest .. seqno]
*/
- protected void handleResendingOfFirstMessage(Address sender, long seqno) {
+ protected void handleResendingOfFirstMessage(Address sender) {
if(log.isTraceEnabled())
- log.trace(local_addr + " <-- SEND_FIRST_SEQNO(" + sender + "," + seqno + ")");
+ log.trace(local_addr + " <-- SEND_FIRST_SEQNO(" + sender + ")");
SenderEntry entry=send_table.get(sender);
Table<Message> win=entry != null? entry.sent_msgs : null;
if(win == null) {
@@ -862,26 +913,17 @@ protected void handleResendingOfFirstMessage(Address sender, long seqno) {
return;
}
- boolean first_sent=false;
- for(long i=win.getLow() +1; i <= seqno; i++) {
- Message rsp=win.get(i);
- if(rsp == null)
- continue;
- if(first_sent) {
- down_prot.down(new Event(Event.MSG, rsp));
- }
- else {
- first_sent=true;
- // We need to copy the UnicastHeader and put it back into the message because Message.copy() doesn't copy
- // the headers and therefore we'd modify the original message in the sender retransmission window
- // (https://jira.jboss.org/jira/browse/JGRP-965)
- Message copy=rsp.copy();
- Header hdr=(Header)copy.getHeader(this.id);
- Header newhdr=hdr.copy();
- newhdr.first=true;
- copy.putHeader(this.id, newhdr);
- down_prot.down(new Event(Event.MSG, copy));
- }
+ Message rsp=win.get(win.getLow() +1);
+ if(rsp != null) {
+ // We need to copy the UnicastHeader and put it back into the message because Message.copy() doesn't copy
+ // the headers and therefore we'd modify the original message in the sender retransmission window
+ // (https://jira.jboss.org/jira/browse/JGRP-965)
+ Message copy=rsp.copy();
+ Header hdr=(Header)copy.getHeader(this.id);
+ Header newhdr=hdr.copy();
+ newhdr.first=true;
+ copy.putHeader(this.id, newhdr);
+ down_prot.down(new Event(Event.MSG, copy));
}
}
@@ -946,16 +988,7 @@ protected void sendAck(Address dst, long seqno, short conn_id) {
}
}
- protected synchronized void startConnectionReaper() {
- if(connection_reaper == null || connection_reaper.isDone())
- connection_reaper=timer.scheduleWithFixedDelay(new ConnectionReaper(), conn_expiry_timeout, conn_expiry_timeout, TimeUnit.MILLISECONDS);
- }
- protected synchronized void stopConnectionReaper() {
- if(connection_reaper != null)
- connection_reaper.cancel(false);
- }
-
protected synchronized short getNewConnectionId() {
short retval=last_conn_id;
if(last_conn_id >= Short.MAX_VALUE || last_conn_id < 0)
@@ -966,60 +999,106 @@ protected synchronized short getNewConnectionId() {
}
- protected void sendRequestForFirstSeqno(Address dest, long seqno_received) {
- Message msg=new Message(dest).setFlag(Message.Flag.OOB)
- .putHeader(this.id, Header.createSendFirstSeqnoHeader(seqno_received));
+ protected void sendRequestForFirstSeqno(Address dest) {
+ Message msg=new Message(dest).setFlag(Message.Flag.OOB).putHeader(this.id, Header.createSendFirstSeqnoHeader());
+ if(log.isTraceEnabled())
+ log.trace(local_addr + " --> SEND_FIRST_SEQNO(" + dest + ")");
+ down_prot.down(new Event(Event.MSG, msg));
+ }
+ public void sendClose(Address dest, short conn_id) {
+ Message msg=new Message(dest).setFlag(Message.Flag.INTERNAL).putHeader(id, Header.createCloseHeader(conn_id));
if(log.isTraceEnabled())
- log.trace(local_addr + " --> SEND_FIRST_SEQNO(" + dest + "," + seqno_received + ")");
+ log.trace(local_addr + " --> CLOSE(" + dest + ", conn-id=" + conn_id + ")");
down_prot.down(new Event(Event.MSG, msg));
}
@ManagedOperation(description="Closes connections that have been idle for more than conn_expiry_timeout ms")
- public void reapIdleConnections() {
- // remove expired connections from send_table
+ public void closeIdleConnections() {
+ // close expired connections in send_table
for(Map.Entry<Address,SenderEntry> entry: send_table.entrySet()) {
SenderEntry val=entry.getValue();
+ if(val.state() != State.OPEN) // only look at open connections
+ continue;
long age=val.age();
if(age >= conn_expiry_timeout) {
- removeSendConnection(entry.getKey());
if(log.isDebugEnabled())
- log.debug(local_addr + ": removed expired connection for " + entry.getKey() +
+ log.debug(local_addr + ": closing expired connection for " + entry.getKey() +
+ " (" + age + " ms old) in send_table");
+ closeSendConnection(entry.getKey());
+ }
+ }
+
+ // close expired connections in recv_table
+ for(Map.Entry<Address,ReceiverEntry> entry: recv_table.entrySet()) {
+ ReceiverEntry val=entry.getValue();
+ if(val.state() != State.OPEN) // only look at open connections
+ continue;
+ long age=val.age();
+ if(age >= conn_expiry_timeout) {
+ if(log.isDebugEnabled())
+ log.debug(local_addr + ": closing expired connection for " + entry.getKey() +
+ " (" + age + " ms old) in recv_table");
+ closeReceiveConnection(entry.getKey());
+ }
+ }
+ }
+
+
+ @ManagedOperation(description="Removes connections that have been closed for more than conn_close_timeout ms")
+ public void removeExpiredConnections() {
+ // remove expired connections from send_table
+ for(Map.Entry<Address,SenderEntry> entry: send_table.entrySet()) {
+ SenderEntry val=entry.getValue();
+ if(val.state() == State.OPEN) // only look at closing or closed connections
+ continue;
+ long age=val.age();
+ if(age >= conn_close_timeout) {
+ if(log.isDebugEnabled())
+ log.debug(local_addr + ": removing expired connection for " + entry.getKey() +
" (" + age + " ms old) from send_table");
+ removeSendConnection(entry.getKey());
}
}
// remove expired connections from recv_table
for(Map.Entry<Address,ReceiverEntry> entry: recv_table.entrySet()) {
ReceiverEntry val=entry.getValue();
+ if(val.state() == State.OPEN) // only look at closing or closed connections
+ continue;
long age=val.age();
- if(age >= conn_expiry_timeout) {
- removeReceiveConnection(entry.getKey());
+ if(age >= conn_close_timeout) {
if(log.isDebugEnabled())
- log.debug(local_addr + ": removed expired connection for " + entry.getKey() +
+ log.debug(local_addr + ": removing expired connection for " + entry.getKey() +
" (" + age + " ms old) from recv_table");
+ removeReceiveConnection(entry.getKey());
}
}
}
+ protected static enum State {OPEN, CLOSING, CLOSED}
+
+
/**
* The following types and fields are serialized:
* <pre>
* | DATA | seqno | conn_id | first |
* | ACK | seqno |
* | SEND_FIRST_SEQNO |
+ * | CLOSE | conn_id |
* </pre>
*/
public static class Header extends org.jgroups.Header {
public static final byte DATA = 0;
public static final byte ACK = 1;
public static final byte SEND_FIRST_SEQNO = 2;
public static final byte XMIT_REQ = 3; // SeqnoList of missing message is in the message's payload
+ public static final byte CLOSE = 4;
byte type;
long seqno; // DATA and ACK
- short conn_id; // DATA
+ short conn_id; // DATA and CLOSE
boolean first; // DATA
@@ -1049,20 +1128,21 @@ public static Header createAckHeader(long seqno, short conn_id) {
return new Header(ACK, seqno, conn_id, false);
}
- public static Header createSendFirstSeqnoHeader(long seqno_received) {
- return new Header(SEND_FIRST_SEQNO, seqno_received);
+ public static Header createSendFirstSeqnoHeader() {
+ return new Header(SEND_FIRST_SEQNO);
}
public static Header createXmitReqHeader() {
return new Header(XMIT_REQ);
}
-
-
- public long getSeqno() {
- return seqno;
+ public static Header createCloseHeader(short conn_id) {
+ return new Header(CLOSE, 0, conn_id, false);
}
+ public long seqno() {return seqno;}
+ public short connId() {return conn_id;}
+
public String toString() {
StringBuilder sb=new StringBuilder();
sb.append(type2Str(type)).append(", seqno=").append(seqno);
@@ -1077,6 +1157,7 @@ public static String type2Str(byte t) {
case ACK: return "ACK";
case SEND_FIRST_SEQNO: return "SEND_FIRST_SEQNO";
case XMIT_REQ: return "XMIT_REQ";
+ case CLOSE: return "CLOSE";
default: return "<unknown>";
}
}
@@ -1093,10 +1174,11 @@ public final int size() {
retval+=Util.size(seqno) + Global.SHORT_SIZE; // conn_id
break;
case SEND_FIRST_SEQNO:
- retval+=Util.size(seqno);
- break;
case XMIT_REQ:
break;
+ case CLOSE:
+ retval+=Global.SHORT_SIZE; // conn-id
+ break;
}
return retval;
}
@@ -1119,10 +1201,11 @@ public void writeTo(DataOutput out) throws Exception {
out.writeShort(conn_id);
break;
case SEND_FIRST_SEQNO:
- Util.writeLong(seqno, out);
- break;
case XMIT_REQ:
break;
+ case CLOSE:
+ out.writeShort(conn_id);
+ break;
}
}
@@ -1139,85 +1222,80 @@ public void readFrom(DataInput in) throws Exception {
conn_id=in.readShort();
break;
case SEND_FIRST_SEQNO:
- seqno=Util.readLong(in);
- break;
case XMIT_REQ:
break;
+ case CLOSE:
+ conn_id=in.readShort();
+ break;
}
}
}
+ protected abstract class Entry {
+ final short conn_id;
+ protected final AtomicLong timestamp=new AtomicLong(0);
+ protected volatile State state=State.OPEN;
+ protected Entry(short conn_id) {
+ this.conn_id=conn_id;
+ update();
+ }
- protected final class SenderEntry {
- // stores (and retransmits) msgs sent by us to a certain peer
- final Table<Message> sent_msgs;
+ short connId() {return conn_id;}
+ void update() {timestamp.set(System.currentTimeMillis());}
+ long age() {return System.currentTimeMillis() - timestamp.longValue();}
+ State state() {return state;}
+ Entry state(State state) {this.state=state; return this;}
+ }
+
+ protected final class SenderEntry extends Entry {
+ final Table<Message> sent_msgs; // stores (and retransmits) msgs sent by us to a certain peer
final AtomicLong sent_msgs_seqno=new AtomicLong(DEFAULT_FIRST_SEQNO); // seqno for msgs sent by us
protected final long[] watermark={0,0}; // the highest acked and highest sent seqno
- final short send_conn_id;
- protected final AtomicLong timestamp=new AtomicLong(0);
- final Lock lock=new ReentrantLock();
public SenderEntry(short send_conn_id) {
- this.send_conn_id=send_conn_id;
+ super(send_conn_id);
this.sent_msgs=new Table<Message>(xmit_table_num_rows, xmit_table_msgs_per_row, 0,
xmit_table_resize_factor, xmit_table_max_compaction_time);
- update();
}
- void update() {timestamp.set(System.currentTimeMillis());}
- long age() {return System.currentTimeMillis() - timestamp.longValue();}
long[] watermark() {return watermark;}
SenderEntry watermark(long ha, long hs) {watermark[0]=ha; watermark[1]=hs; return this;}
public String toString() {
StringBuilder sb=new StringBuilder();
if(sent_msgs != null)
sb.append(sent_msgs).append(", ");
- sb.append("send_conn_id=" + send_conn_id).append(" (" + age() + " ms old)");
+ sb.append("send_conn_id=" + conn_id).append(" (" + age() + " ms old) - " + state);
return sb.toString();
}
}
- protected static final class ReceiverEntry {
+ protected final class ReceiverEntry extends Entry {
protected final Table<Message> received_msgs; // stores all msgs rcvd by a certain peer in seqno-order
- protected final short recv_conn_id;
- protected final AtomicLong timestamp=new AtomicLong(0);
protected volatile boolean send_ack;
public ReceiverEntry(Table<Message> received_msgs, short recv_conn_id) {
+ super(recv_conn_id);
this.received_msgs=received_msgs;
- this.recv_conn_id=recv_conn_id;
- update();
}
- void update() {timestamp.set(System.currentTimeMillis());}
- long age() {return System.currentTimeMillis() - timestamp.longValue();}
ReceiverEntry sendAck(boolean flag) {send_ack=flag; return this;}
boolean sendAck() {boolean retval=send_ack; send_ack=false; return retval;}
public String toString() {
StringBuilder sb=new StringBuilder();
if(received_msgs != null)
sb.append(received_msgs).append(", ");
- sb.append("recv_conn_id=" + recv_conn_id);
- sb.append(" (" + age() + " ms old)");
+ sb.append("recv_conn_id=" + conn_id);
+ sb.append(" (" + age() + " ms old) - " + state);
if(send_ack)
sb.append(" [ack pending]");
return sb.toString();
}
}
- protected class ConnectionReaper implements Runnable {
- public void run() {
- reapIdleConnections();
- }
-
- public String toString() {
- return UNICAST3.class.getSimpleName() + ": ConnectionReaper (interval=" + conn_expiry_timeout + " ms)";
- }
- }
/**
@@ -1241,7 +1319,7 @@ public String toString() {
}
}
- @ManagedOperation(description="Triggers the retransmission task, asking all senders for missing messages")
+ @ManagedOperation(description="Triggers the retransmission task")
public void triggerXmit() {
SeqnoList missing;
@@ -1252,7 +1330,7 @@ public void triggerXmit() {
// send acks if needed
if(win != null && val.sendAck()) // sendAck() resets send_ack to false
- sendAck(target, win.getHighestDelivered(), val.recv_conn_id);
+ sendAck(target, win.getHighestDelivered(), val.connId());
// retransmit missing messages
if(win != null && win.getNumMissing() > 0 && (missing=win.getMissing()) != null) { // getNumMissing() is fast
@@ -1289,6 +1367,14 @@ else if(!xmit_task_map.isEmpty())
val.watermark(highest_acked, highest_sent);
}
}
+
+
+ // close idle connections
+ if(conn_expiry_timeout > 0)
+ closeIdleConnections();
+
+ if(conn_close_timeout > 0)
+ removeExpiredConnections();
}
View
8 src/org/jgroups/protocols/pbcast/NAKACK2.java
@@ -459,7 +459,7 @@ public Object down(Event evt) {
case Event.MSG:
Message msg=(Message)evt.getArg();
Address dest=msg.getDest();
- if(dest != null || msg.isFlagSet(Message.NO_RELIABILITY))
+ if(dest != null || msg.isFlagSet(Message.Flag.NO_RELIABILITY))
break; // unicast address: not null and not mcast, pass down unchanged
send(evt, msg);
@@ -550,7 +550,7 @@ public Object up(Event evt) {
case Event.MSG:
Message msg=(Message)evt.getArg();
- if(msg.isFlagSet(Message.NO_RELIABILITY))
+ if(msg.isFlagSet(Message.Flag.NO_RELIABILITY))
break;
NakAckHeader2 hdr=(NakAckHeader2)msg.getHeader(this.id);
if(hdr == null)
@@ -781,7 +781,7 @@ protected void handleMessage(Message msg, NakAckHeader2 hdr) {
if(loopback)
msg=buf.get(hdr.seqno); // we *have* to get a message, because loopback means we didn't add it to win !
if(msg != null && msg.isFlagSet(Message.Flag.OOB)) {
- if(msg.setTransientFlagIfAbsent(Message.OOB_DELIVERED)) {
+ if(msg.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) {
if(log.isTraceEnabled())
log.trace(new StringBuilder().append(local_addr).append(": delivering ").append(sender).append('#').append(hdr.seqno));
try {
@@ -829,7 +829,7 @@ protected void handleMessages(Address sender, List<Tuple<Long,Message>> msgs, bo
long seq=tuple.getVal1();
Message msg=loopback? buf.get(seq) : tuple.getVal2(); // we *have* to get the message, because loopback means we didn't add it to win !
if(msg != null && msg.isFlagSet(Message.Flag.OOB)) {
- if(msg.setTransientFlagIfAbsent(Message.OOB_DELIVERED)) {
+ if(msg.setTransientFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED)) {
if(log.isTraceEnabled())
log.trace(new StringBuilder().append(local_addr).append(": delivering ")
.append(sender).append('#').append(seq));
View
2 tests/junit-functional/org/jgroups/protocols/UNICAST_ConnectionTests.java
@@ -255,7 +255,7 @@ else if(prot instanceof UNICAST2) {
}
else if(prot instanceof UNICAST3) {
UNICAST3 unicast=(UNICAST3)prot;
- unicast.removeConnection(target);
+ unicast.closeConnection(target);
}
else
throw new IllegalArgumentException("prot (" + prot + ") needs to be UNICAST, UNICAST2 or UNICAST3");
View
2 tests/junit-functional/org/jgroups/tests/LargeMergeTest.java
@@ -85,7 +85,7 @@ void setUp() throws Exception {
//new MERGE2().setValue("min_interval",8000)
//.setValue("max_interval",15000).setValue("merge_fast",false),
new MERGE3().setValue("min_interval",1000)
- .setValue("max_interval",10000)
+ .setValue("max_interval",5000)
.setValue("max_participants_in_merge", MAX_PARTICIPANTS_IN_MERGE),
new NAKACK2().setValue("use_mcast_xmit",false)
.setValue("discard_delivered_msgs",true)
View
2 tests/junit/org/jgroups/tests/OOBTest.java
@@ -168,7 +168,7 @@ public void testRandomRegularAndOOBMulticasts() throws Exception {
send(null, NUM_MSGS, NUM_THREADS, 0.5); // send on random channel (a or b)
Collection<Integer> one=r1.getMsgs(), two=r2.getMsgs();
- for(int i=0; i < 10; i++) {
+ for(int i=0; i < 20; i++) {
if(one.size() == NUM_MSGS && two.size() == NUM_MSGS)
break;
System.out.println("A size " + one.size() + ", B size " + two.size());

0 comments on commit 49aa197

Please sign in to comment.
Something went wrong with that request. Please try again.