Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Preliminary code for delivering meg-lists

  • Loading branch information...
commit e52575c0a7d2f98cea8ab3da95e03435a219e8a9 1 parent 696434b
@belaban authored
View
4 src/org/jgroups/Event.java
@@ -8,7 +8,8 @@
*/
public class Event {
public static final int MSG = 1; // arg = Message
- public static final int CONNECT = 2; // arg = cluster name (string)
+ public static final int MSG_LIST = 2; // arg = List<Message>
+ public static final int CONNECT = 3; // arg = cluster name (string)
public static final int DISCONNECT = 4; // arg = member address (Address)
public static final int VIEW_CHANGE = 6; // arg = View (or MergeView in case of merge)
public static final int SET_LOCAL_ADDRESS = 8; // arg = Address
@@ -91,6 +92,7 @@ public Object getArg() {
public static String type2String(int t) {
switch(t) {
case MSG: return "MSG";
+ case MSG_LIST: return "MSG_LIST";
case CONNECT: return "CONNECT";
case DISCONNECT: return "DISCONNECT";
case VIEW_CHANGE: return "VIEW_CHANGE";
View
55 src/org/jgroups/protocols/TP.java
@@ -1196,6 +1196,43 @@ protected void passMessageUp(Message msg, boolean perform_cluster_name_matching,
}
+ protected void passMessagesUp(List<Message> msgs, boolean perform_cluster_name_matching, boolean multicast, boolean discard_own_mcast) {
+ Message tmp=msgs.isEmpty()? null : msgs.get(0);
+ if(tmp == null)
+ return;
+ TpHeader hdr=(TpHeader)tmp.getHeader(this.id);
+ if(hdr == null) {
+ if(log.isErrorEnabled())
+ log.error(new StringBuilder("message does not have a transport header, msg is ").append(tmp).
+ append(", headers are ").append(tmp.printHeaders()).append(", will be discarded").toString());
+ return;
+ }
+
+ if(log.isTraceEnabled())
+ log.trace(new StringBuilder("received ").append(msgs.size()).append(" msgs"));
+
+ String ch_name=hdr.channel_name;
+
+ final Protocol tmp_prot=isSingleton()? up_prots.get(ch_name) : up_prot;
+ if(tmp_prot != null) {
+ boolean is_protocol_adapter=tmp_prot instanceof ProtocolAdapter;
+ // Discard if message's cluster name is not the same as our cluster name
+ if(!is_protocol_adapter && perform_cluster_name_matching && channel_name != null && !channel_name.equals(ch_name)) {
+ if(log.isWarnEnabled() && log_discard_msgs)
+ log.warn(new StringBuilder("discarded message from different cluster \"").append(ch_name).
+ append("\" (our cluster is \"").append(channel_name).append("\"). Sender was ").append(tmp.getSrc()).toString());
+ return;
+ }
+
+ if(loopback && multicast && discard_own_mcast) {
+ Address local=is_protocol_adapter? ((ProtocolAdapter)tmp_prot).getAddress() : local_addr;
+ if(local != null && local.equals(tmp.getSrc()))
+ return;
+ }
+ tmp_prot.up(new Event(Event.MSG_LIST, msgs));
+ }
+ }
+
/**
@@ -1719,6 +1756,11 @@ public void run() {
if(is_message_list) { // used if message bundling is enabled
List<Message> msgs=readMessageList(dis);
+ Message tmp=msgs.isEmpty()? null : msgs.get(0);
+ if(tmp != null && tmp.getDest() == null) {
+ handleMyMessageList(msgs, multicast);
+ return;
+ }
for(Message msg: msgs) {
if(msg.isFlagSet(Message.OOB)) {
log.warn("bundled message should not be marked as OOB");
@@ -1748,6 +1790,16 @@ private void handleMyMessage(Message msg, boolean multicast) {
}
passMessageUp(msg, true, multicast, true);
}
+
+ private void handleMyMessageList(List<Message> msgs, boolean multicast) {
+ if(stats) {
+ for(Message msg: msgs) {
+ num_msgs_received++;
+ num_bytes_received+=msg.getLength();
+ }
+ }
+ passMessagesUp(msgs, true, multicast, true);
+ }
}
@@ -2101,7 +2153,6 @@ private void sendBundledMessages(final Map<SingletonAddress,List<Message>> msgs)
protected SocketFactory socket_factory=new DefaultSocketFactory();
Address local_addr;
- // TODO [JGRP-1194] - Revisit implementation of TUNNEL and shared transport
static final ThreadLocal<ProtocolAdapter> thread_local=new ThreadLocal<ProtocolAdapter>();
public ProtocolAdapter(String cluster_name, Address local_addr, short transport_id, Protocol up, Protocol down, String pattern) {
@@ -2188,14 +2239,12 @@ public Object down(Event evt) {
members.addAll(tmp);
break;
case Event.DISCONNECT:
- // TODO [JGRP-1194] - Revisit implementation of TUNNEL and shared transport
thread_local.set(this);
break;
case Event.CONNECT:
case Event.CONNECT_WITH_STATE_TRANSFER:
case Event.CONNECT_USE_FLUSH:
case Event.CONNECT_WITH_STATE_TRANSFER_USE_FLUSH:
- // TODO [JGRP-1194] - Revisit implementation of TUNNEL and shared transport
thread_local.set(this);
cluster_name=(String)evt.getArg();
factory.setClusterName(cluster_name);
View
206 src/org/jgroups/protocols/pbcast/NAKACK2.java
@@ -1,9 +1,6 @@
package org.jgroups.protocols.pbcast;
-import org.jgroups.Address;
-import org.jgroups.Event;
-import org.jgroups.Message;
-import org.jgroups.View;
+import org.jgroups.*;
import org.jgroups.annotations.*;
import org.jgroups.protocols.TP;
import org.jgroups.stack.DiagnosticsHandler;
@@ -147,6 +144,8 @@
/** RetransmitTask running every xmit_interval ms */
protected Future<?> xmit_task;
+ protected final Table.SeqnoExtractor seqno_extractor=new MySeqnoExtractor(id);
+
protected volatile boolean leaving=false;
protected volatile boolean running=false;
protected TimeScheduler timer=null;
@@ -503,61 +502,76 @@ public Object down(Event evt) {
public Object up(Event evt) {
switch(evt.getType()) {
- case Event.MSG:
- Message msg=(Message)evt.getArg();
- if(msg.isFlagSet(Message.NO_RELIABILITY))
- break;
- NakAckHeader2 hdr=(NakAckHeader2)msg.getHeader(this.id);
- if(hdr == null)
- break; // pass up (e.g. unicast msg)
-
- if(!is_server) { // discard messages while not yet server (i.e., until JOIN has returned)
- if(log.isTraceEnabled())
- log.trace(local_addr + ": message " + msg.getSrc() + "::" + hdr.seqno + " was discarded (not yet server)");
- return null;
- }
+ case Event.MSG_LIST:
+ List<Message> msgs=(List<Message>)evt.getArg();
+ Message tmp=msgs.get(0);
+ if(tmp.isFlagSet(Message.Flag.NO_RELIABILITY))
+ break;
- // Changed by bela Jan 29 2003: we must not remove the header, otherwise further xmit requests will fail !
- //hdr=(NakAckHeader2)msg.removeHeader(getName());
+ if(!is_server) { // discard messages while not yet server (i.e., until JOIN has returned)
+ if(log.isTraceEnabled())
+ log.trace(local_addr + ": messages were discarded (not yet server)");
+ return null;
+ }
- switch(hdr.type) {
+ handleMessages(msgs, tmp.getSrc());
+ return null;
- case NakAckHeader2.MSG:
- handleMessage(msg, hdr);
- return null; // transmitter passes message up for us !
+ case Event.MSG:
+ Message msg=(Message)evt.getArg();
+ if(msg.isFlagSet(Message.NO_RELIABILITY))
+ break;
+ NakAckHeader2 hdr=(NakAckHeader2)msg.getHeader(this.id);
+ if(hdr == null)
+ break; // pass up (e.g. unicast msg)
- case NakAckHeader2.XMIT_REQ:
- SeqnoList missing=(SeqnoList)msg.getObject();
- if(missing == null) {
- if(log.isErrorEnabled())
- log.error("XMIT_REQ: no missing seqnos; discarding request from " + msg.getSrc());
+ if(!is_server) { // discard messages while not yet server (i.e., until JOIN has returned)
+ if(log.isTraceEnabled())
+ log.trace(local_addr + ": message " + msg.getSrc() + "::" + hdr.seqno + " was discarded (not yet server)");
return null;
}
- handleXmitReq(msg.getSrc(), missing, hdr.sender);
- return null;
-
- case NakAckHeader2.XMIT_RSP:
- handleXmitRsp(msg, hdr);
- return null;
- default:
- if(log.isErrorEnabled()) {
- log.error("NakAck header type " + hdr.type + " not known !");
+ // Changed by bela Jan 29 2003: we must not remove the header, otherwise further xmit requests will fail !
+ //hdr=(NakAckHeader2)msg.removeHeader(getName());
+
+ switch(hdr.type) {
+
+ case NakAckHeader2.MSG:
+ handleMessage(msg, hdr);
+ return null; // transmitter passes message up for us !
+
+ case NakAckHeader2.XMIT_REQ:
+ SeqnoList missing=(SeqnoList)msg.getObject();
+ if(missing == null) {
+ if(log.isErrorEnabled())
+ log.error("XMIT_REQ: no missing seqnos; discarding request from " + msg.getSrc());
+ return null;
+ }
+ handleXmitReq(msg.getSrc(), missing, hdr.sender);
+ return null;
+
+ case NakAckHeader2.XMIT_RSP:
+ handleXmitRsp(msg, hdr);
+ return null;
+
+ default:
+ if(log.isErrorEnabled()) {
+ log.error("NakAck header type " + hdr.type + " not known !");
+ }
+ return null;
}
- return null;
- }
- case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
- stable((Digest)evt.getArg());
- return null; // do not pass up further (Bela Aug 7 2001)
+ case Event.STABLE: // generated by STABLE layer. Delete stable messages passed in arg
+ stable((Digest)evt.getArg());
+ return null; // do not pass up further (Bela Aug 7 2001)
- case Event.SUSPECT:
- // release the promise if rebroadcasting is in progress... otherwise we wait forever. there will be a new
- // flush round anyway
- if(rebroadcasting) {
- cancelRebroadcasting();
- }
- break;
+ case Event.SUSPECT:
+ // release the promise if rebroadcasting is in progress... otherwise we wait forever. there will be a new
+ // flush round anyway
+ if(rebroadcasting) {
+ cancelRebroadcasting();
+ }
+ break;
}
return up_prot.up(evt);
}
@@ -737,6 +751,88 @@ protected void handleMessage(Message msg, NakAckHeader2 hdr) {
}
+ protected void handleMessages(List<Message> msgs, Address sender) {
+ if(sender == null) {
+ if(log.isErrorEnabled())
+ log.error("sender of message is null");
+ return;
+ }
+
+ Table<Message> buf=xmit_table.get(sender);
+ if(buf == null) { // discard message if there is no entry for sender
+ if(leaving)
+ return;
+ if(log.isWarnEnabled() && log_discard_msgs)
+ log.warn(local_addr + ": dropped messages from " + sender +
+ " (sender not in table " + xmit_table.keySet() +"), view=" + view);
+ return;
+ }
+
+ boolean loopback=local_addr.equals(sender);
+ boolean added=loopback || buf.add(msgs, seqno_extractor);
+
+
+ // OOB msg is passed up. When removed, we discard it. Affects ordering: http://jira.jboss.com/jira/browse/JGRP-379
+ if(added) {
+ for(Message msg: msgs) {
+ if(msg.isFlagSet(Message.OOB)) {
+ if(loopback) {
+ NakAckHeader2 hdr=(NakAckHeader2)msg.getHeader(id);
+ msg=buf.get(hdr.seqno); // we *have* to get a message, because loopback means we didn't add it to win !
+ }
+ if(msg != null && msg.isFlagSet(Message.OOB)) {
+ if(msg.setTransientFlagIfAbsent(Message.OOB_DELIVERED))
+ up_prot.up(new Event(Event.MSG, msg));
+ }
+ }
+ }
+ }
+
+ // Efficient way of checking whether another thread is already processing messages from 'sender'.
+ // If that's the case, we return immediately and let the existing thread process our message
+ // (https://jira.jboss.org/jira/browse/JGRP-829). Benefit: fewer threads blocked on the same lock, these threads
+ // can be returned to the thread pool
+ final AtomicBoolean processing=buf.getProcessing();
+ if(!processing.compareAndSet(false, true)) {
+ return;
+ }
+
+ boolean remove_msgs=discard_delivered_msgs && !loopback;
+ boolean released_processing=false;
+ try {
+ while(true) {
+ // we're removing a msg and set processing to false (if null) *atomically* (wrt to add())
+ msgs=buf.removeMany(processing, remove_msgs, max_msg_batch_size);
+ if(msgs == null || msgs.isEmpty()) {
+ released_processing=true;
+ if(rebroadcasting)
+ checkForRebroadcasts();
+ return;
+ }
+
+ for(final Message msg_to_deliver: msgs) {
+ // discard OOB msg if it has already been delivered (http://jira.jboss.com/jira/browse/JGRP-379)
+ if(msg_to_deliver.isFlagSet(Message.OOB) && !msg_to_deliver.setTransientFlagIfAbsent(Message.OOB_DELIVERED))
+ continue;
+
+ //msg_to_deliver.removeHeader(getName()); // Changed by bela Jan 29 2003: not needed (see above)
+ try {
+ up_prot.up(new Event(Event.MSG, msg_to_deliver));
+ }
+ catch(Throwable t) {
+ log.error("couldn't deliver message " + msg_to_deliver, t);
+ }
+ }
+ }
+ }
+ finally {
+ // processing is always set in win.remove(processing) above and never here ! This code is just a
+ // 2nd line of defense should there be an exception before win.remove(processing) sets processing
+ if(!released_processing)
+ processing.set(false);
+ }
+ }
+
/**
* Retransmits messsages first_seqno to last_seqno from original_sender from xmit_table to xmit_requester,
@@ -1274,5 +1370,17 @@ public boolean visit(long seqno, Message element, int row, int column) {
}
+ protected static class MySeqnoExtractor implements Table.SeqnoExtractor<Message> {
+ protected final short id;
+
+ public MySeqnoExtractor(short id) {
+ this.id=id;
+ }
+
+ public long getSeqnoFrom(Message msg) {
+ NakAckHeader2 hdr=(NakAckHeader2)msg.getHeader(id);
+ return hdr.getSeqno();
+ }
+ }
}
View
66 src/org/jgroups/util/Table.java
@@ -82,6 +82,10 @@
boolean visit(long seqno, T element, int row, int column);
}
+ public interface SeqnoExtractor<T> {
+ long getSeqnoFrom(T element);
+ }
+
public Table() {
@@ -172,25 +176,53 @@ public void setHighestDelivered(long seqno) {
public boolean add(long seqno, T element) {
lock.lock();
try {
- if(seqno <= hd)
- return false;
+ return _add(seqno, element);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
- int row_index=computeRow(seqno);
- if(row_index >= matrix.length) {
- resize(seqno);
- row_index=computeRow(seqno);
- }
- T[] row=getRow(row_index);
- int index=computeIndex(seqno);
- T existing_element=row[index];
- if(existing_element == null) {
- row[index]=element;
- size++;
- if(seqno > hr)
- hr=seqno;
- return true;
- }
+
+ protected boolean _add(long seqno, T element) {
+ if(seqno <= hd)
return false;
+
+ int row_index=computeRow(seqno);
+ if(row_index >= matrix.length) {
+ resize(seqno);
+ row_index=computeRow(seqno);
+ }
+ T[] row=getRow(row_index);
+ int index=computeIndex(seqno);
+ T existing_element=row[index];
+ if(existing_element == null) {
+ row[index]=element;
+ size++;
+ if(seqno > hr)
+ hr=seqno;
+ return true;
+ }
+ return false;
+
+ }
+
+
+ public boolean add(List<T> elements, SeqnoExtractor<T> extractor) {
+ if(elements == null || extractor == null)
+ return false;
+ boolean retval=false;
+ int num=0;
+ lock.lock();
+ try {
+ for(T element: elements) {
+ if(_add(extractor.getSeqnoFrom(element), element)) {
+ retval=true;
+ num++;
+ }
+ }
+ // System.out.println("added " + num + " elements");
+ return retval;
}
finally {
lock.unlock();
Please sign in to comment.
Something went wrong with that request. Please try again.