Skip to content

Commit

Permalink
- First batch of classes converted to use down(Message) [https://issu…
Browse files Browse the repository at this point in the history
…es.jboss.org/browse/JGRP-2067]

- Protocols now use down_prot.down(msg) instead of down_prot.down(new Event(Event.MSG, msg))
- Converted up_prot.up(new Event(Event.MSG, msg) to up_prot.up(msg)
- Removed Event.MSG
- Catching exceptions when calling ThreadPoolExecutor.execute()
  • Loading branch information
belaban committed Jul 26, 2016
1 parent 6cb915b commit 47bd9b7
Show file tree
Hide file tree
Showing 111 changed files with 2,552 additions and 2,796 deletions.
5 changes: 1 addition & 4 deletions src/org/jgroups/Event.java
@@ -1,13 +1,13 @@
package org.jgroups; package org.jgroups;



/** /**
* Event is a JGroups internal class used for inter-stack and intra-stack communication. * Event is a JGroups internal class used for inter-stack and intra-stack communication.
* *
* @since 2.0 * @since 2.0
* @author Bela Ban * @author Bela Ban
*/ */
public class Event { public class Event {
public static final int MSG = 1; // arg = Message
public static final int CONNECT = 2; // arg = cluster name (string) public static final int CONNECT = 2; // arg = cluster name (string)
public static final int DISCONNECT = 4; // arg = member address (Address) public static final int DISCONNECT = 4; // arg = member address (Address)
public static final int VIEW_CHANGE = 6; // arg = View (or MergeView in case of merge) public static final int VIEW_CHANGE = 6; // arg = View (or MergeView in case of merge)
Expand Down Expand Up @@ -97,7 +97,6 @@ public Event(int type, Object arg) {


public static String type2String(int t) { public static String type2String(int t) {
switch(t) { switch(t) {
case MSG: return "MSG";
case CONNECT: return "CONNECT"; case CONNECT: return "CONNECT";
case DISCONNECT: return "DISCONNECT"; case DISCONNECT: return "DISCONNECT";
case VIEW_CHANGE: return "VIEW_CHANGE"; case VIEW_CHANGE: return "VIEW_CHANGE";
Expand Down Expand Up @@ -162,8 +161,6 @@ public static String type2String(int t) {
public String toString() { public String toString() {
StringBuilder ret=new StringBuilder(64); StringBuilder ret=new StringBuilder(64);
ret.append(type2String(type)).append(", arg=").append(arg); ret.append(type2String(type)).append(", arg=").append(arg);
if(type == MSG)
ret.append(" (headers=").append(((Message)arg).printHeaders()).append(")");
return ret.toString(); return ret.toString();
} }


Expand Down
48 changes: 28 additions & 20 deletions src/org/jgroups/JChannel.java
Expand Up @@ -489,7 +489,7 @@ public JChannel send(Message msg) throws Exception {
if(msg == null) if(msg == null)
throw new NullPointerException("msg is null"); throw new NullPointerException("msg is null");
checkClosedOrNotConnected(); checkClosedOrNotConnected();
down(new Event(Event.MSG, msg)); down(msg);
return this; return this;
} }


Expand Down Expand Up @@ -641,11 +641,16 @@ public JChannel stopFlush(List<Address> flushParticipants) {
*/ */
public Object down(Event evt) { public Object down(Event evt) {
if(evt == null) return null; if(evt == null) return null;
if(stats && evt.getType() == Event.MSG) { return prot_stack.down(evt);
}

public Object down(Message msg) {
if(msg == null) return null;
if(stats) {
sent_msgs++; sent_msgs++;
sent_bytes+=((Message)evt.getArg()).getLength(); sent_bytes+=msg.getLength();
} }
return prot_stack.down(evt); return prot_stack.down(msg);
} }




Expand All @@ -656,19 +661,6 @@ public Object down(Event evt) {
*/ */
public Object up(Event evt) { public Object up(Event evt) {
switch(evt.getType()) { switch(evt.getType()) {

case Event.MSG:
Message msg=evt.getArg();
if(stats) {
received_msgs++;
received_bytes+=msg.getLength();
}

// discard local messages (sent by myself to me)
if(discard_own_messages && local_addr != null && msg.getSrc() != null && local_addr.equals(msg.getSrc()))
return null;
break;

case Event.VIEW_CHANGE: case Event.VIEW_CHANGE:
View tmp=evt.getArg(); View tmp=evt.getArg();
if(tmp instanceof MergeView) if(tmp instanceof MergeView)
Expand Down Expand Up @@ -776,6 +768,25 @@ public Object up(Event evt) {
return null; return null;
} }


public Object up(Message msg) {
if(stats) {
received_msgs++;
received_bytes+=msg.getLength();
}

// discard local messages (sent by myself to me)
if(discard_own_messages && local_addr != null && msg.getSrc() != null && local_addr.equals(msg.getSrc()))
return null;

// If UpHandler is installed, pass all events to it and return (UpHandler is e.g. a building block)
if(up_handler != null)
return up_handler.up(msg);

if(receiver != null)
receiver.receive(msg);
return null;
}



/** Callback invoked by the protocol stack to deliver a message batch */ /** Callback invoked by the protocol stack to deliver a message batch */
public JChannel up(MessageBatch batch) { public JChannel up(MessageBatch batch) {
Expand Down Expand Up @@ -908,9 +919,6 @@ protected JChannel getState(Address target, long timeout, Callable<Boolean> flus


protected Object invokeCallback(int type, Object arg) { protected Object invokeCallback(int type, Object arg) {
switch(type) { switch(type) {
case Event.MSG:
receiver.receive((Message)arg);
break;
case Event.VIEW_CHANGE: case Event.VIEW_CHANGE:
receiver.viewAccepted((View)arg); receiver.viewAccepted((View)arg);
break; break;
Expand Down
14 changes: 5 additions & 9 deletions src/org/jgroups/UpHandler.java
Expand Up @@ -4,30 +4,26 @@
import org.jgroups.util.MessageBatch; import org.jgroups.util.MessageBatch;


/** /**
* Provides a hook to hijack over all events received by a certain channel which has installed this * Provides a hook to hijack all events received by a certain channel which has installed this UpHandler.<p>
* UpHandler. * Client usually never need to implement this interface and it is mostly used by JGroups building blocks.
* <p>
* Client usually never need to implement this interface and it is mostly used by JGroups building
* blocks.
*
* @since 2.0 * @since 2.0
* @author Bela Ban * @author Bela Ban
*/ */
public interface UpHandler { public interface UpHandler {


/** /**
* Invoked for all channel events except connection management and state transfer. * Invoked for all channel events except connection management and state transfer.
* @param evt
*/ */
Object up(Event evt); Object up(Event evt);


Object up(Message msg);

default void up(MessageBatch batch) { default void up(MessageBatch batch) {
for(Message msg: batch) { for(Message msg: batch) {
try { try {
up(new Event(Event.MSG, msg)); up(msg);
} }
catch(Throwable t) { catch(Throwable t) {

} }
} }
} }
Expand Down
55 changes: 25 additions & 30 deletions src/org/jgroups/auth/DemoToken.java
Expand Up @@ -52,7 +52,7 @@ public boolean authenticate(AuthToken token, Message msg) {
pending_requests.put(sender, entry); // here we'd have to check if a latch already exists... pending_requests.put(sender, entry); // here we'd have to check if a latch already exists...
if(log.isTraceEnabled()) if(log.isTraceEnabled())
log.trace(auth.getAddress() + ": sending challenge to " + sender); log.trace(auth.getAddress() + ": sending challenge to " + sender);
auth.getDownProtocol().down(new Event(Event.MSG, challenge)); auth.getDownProtocol().down(challenge);
try { try {
entry.latch.await(block_time, TimeUnit.MILLISECONDS); entry.latch.await(block_time, TimeUnit.MILLISECONDS);
pending_requests.remove(sender); pending_requests.remove(sender);
Expand All @@ -71,38 +71,33 @@ public void readFrom(DataInput in) throws Exception {}
public int size() {return 0;} public int size() {return 0;}




public boolean handleUpEvent(Event evt) { public boolean handleUpMessage(Message msg) {
switch(evt.getType()) { DemoHeader hdr=msg.getHeader(ID);
case Event.MSG: if(hdr == null)
Message msg=(Message)evt.getArg(); return true;
DemoHeader hdr=(DemoHeader)msg.getHeader(ID); switch(hdr.type) {
if(hdr == null) case DemoHeader.CHALLENGE:
if(log.isTraceEnabled())
log.trace(auth.getAddress() + ": received CHALLENGE from " + msg.getSrc());
long hash=hash(encrypt(hdr.payload));
Message response=new Message(msg.getSrc()).setFlag(Message.Flag.OOB);
response.putHeader(ID, new DemoHeader(hash));
if(log.isTraceEnabled())
log.trace(auth.getAddress() + ": sending RESPONSE to " + msg.getSrc());
auth.getDownProtocol().down(response);
break;
case DemoHeader.RESPONSE:
if(log.isTraceEnabled())
log.trace(auth.getAddress() + ": received RESPONSE from " + msg.getSrc());
Entry entry=pending_requests.get(msg.getSrc());
if(entry == null) {
// error message
break; break;
switch(hdr.type) {
case DemoHeader.CHALLENGE:
if(log.isTraceEnabled())
log.trace(auth.getAddress() + ": received CHALLENGE from " + msg.getSrc());
long hash=hash(encrypt(hdr.payload));
Message response=new Message(msg.getSrc()).setFlag(Message.Flag.OOB);
response.putHeader(ID, new DemoHeader(hash));
if(log.isTraceEnabled())
log.trace(auth.getAddress() + ": sending RESPONSE to " + msg.getSrc());
auth.getDownProtocol().down(new Event(Event.MSG, response));
break;
case DemoHeader.RESPONSE:
if(log.isTraceEnabled())
log.trace(auth.getAddress() + ": received RESPONSE from " + msg.getSrc());
Entry entry=pending_requests.get(msg.getSrc());
if(entry == null) {
// error message
break;
}
entry.setResponse(hdr.hash);
break;
} }
return false; // don't pass up entry.setResponse(hdr.hash);
break;
} }
return true; return false; // don't pass up
} }




Expand Down
15 changes: 12 additions & 3 deletions src/org/jgroups/blocks/MessageDispatcher.java
Expand Up @@ -574,6 +574,12 @@ public Object up(Event evt) {
return null; return null;
} }


public Object up(Message msg) {
if(corr != null)
corr.receiveMessage(msg);
return null;
}

public void up(MessageBatch batch) { public void up(MessageBatch batch) {
if(corr == null) if(corr == null)
return; return;
Expand All @@ -582,17 +588,20 @@ public void up(MessageBatch batch) {


@Override @Override
public Object down(Event evt) { public Object down(Event evt) {
return channel != null? channel.down(evt) : null;
}

public Object down(Message msg) {
if(channel != null) { if(channel != null) {
if(evt.getType() == Event.MSG && !(channel.isConnected() || channel.isConnecting())) { if(!(channel.isConnected() || channel.isConnecting())) {
// return null; // return null;
throw new IllegalStateException("channel is not connected"); throw new IllegalStateException("channel is not connected");
} }
return channel.down(evt); return channel.down(msg);
} }
return null; return null;
} }



/* ----------------------- End of Protocol Interface ------------------------ */ /* ----------------------- End of Protocol Interface ------------------------ */


} }
Expand Down
14 changes: 5 additions & 9 deletions src/org/jgroups/blocks/RequestCorrelator.java
Expand Up @@ -156,7 +156,7 @@ public void sendRequest(Collection<Address> dest_mbrs, Buffer data, Request req,


if(opts.anycasting()) { if(opts.anycasting()) {
if(opts.useAnycastAddresses()) { if(opts.useAnycastAddresses()) {
transport.down(new Event(Event.MSG, msg.dest(new AnycastAddress(dest_mbrs)))); transport.down(msg.dest(new AnycastAddress(dest_mbrs)));
} }
else { else {
boolean first=true; boolean first=true;
Expand All @@ -165,12 +165,12 @@ public void sendRequest(Collection<Address> dest_mbrs, Buffer data, Request req,
first=false; first=false;
if(!mbr.equals(local_addr) && copy.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK)) if(!mbr.equals(local_addr) && copy.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK))
copy.clearTransientFlag(Message.TransientFlag.DONT_LOOPBACK); copy.clearTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
transport.down(new Event(Event.MSG, copy)); transport.down(copy);
} }
} }
} }
else else
transport.down(new Event(Event.MSG, msg)); transport.down(msg);
} }


/** Sends a request to a single destination */ /** Sends a request to a single destination */
Expand Down Expand Up @@ -198,7 +198,7 @@ public void sendUnicastRequest(Address dest, Buffer data, Request req, RequestOp
} }
else // async RPC else // async RPC
rpc_stats.add(RpcStats.Type.UNICAST, dest, false, 0); rpc_stats.add(RpcStats.Type.UNICAST, dest, false, 0);
transport.down(new Event(Event.MSG, msg)); transport.down(msg);
} }




Expand Down Expand Up @@ -234,10 +234,6 @@ public boolean receive(Event evt) {
setLocalAddress(evt.getArg()); setLocalAddress(evt.getArg());
break; break;


case Event.MSG:
if(receiveMessage(evt.getArg()))
return true; // message was consumed, don't pass it up
break;
case Event.SITE_UNREACHABLE: case Event.SITE_UNREACHABLE:
SiteMaster site_master=evt.getArg(); SiteMaster site_master=evt.getArg();
String site=site_master.getSite(); String site=site_master.getSite();
Expand Down Expand Up @@ -452,7 +448,7 @@ protected void sendResponse(Message rsp, long req_id, boolean is_exception) {
rsp.putHeader(corr_id, rsp_hdr); rsp.putHeader(corr_id, rsp_hdr);
if(log.isTraceEnabled()) if(log.isTraceEnabled())
log.trace("sending rsp for %d to %s", req_id, rsp.getDest()); log.trace("sending rsp for %d to %s", req_id, rsp.getDest());
transport.down(new Event(Event.MSG, rsp)); transport.down(rsp);
} }


protected static Buffer replyToBuffer(Object obj, Marshaller marshaller) throws Exception { protected static Buffer replyToBuffer(Object obj, Marshaller marshaller) throws Exception {
Expand Down
11 changes: 5 additions & 6 deletions src/org/jgroups/fork/ForkChannel.java
Expand Up @@ -197,17 +197,16 @@ public void close() {
((ForkProtocolStack)prot_stack).remove(fork_channel_id); ((ForkProtocolStack)prot_stack).remove(fork_channel_id);
if(state == State.CLOSED) if(state == State.CLOSED)
return; return;
disconnect(); // leave group if connected disconnect(); // leave cluster if connected
prot_stack.destroy(); prot_stack.destroy();
state=State.CLOSED; state=State.CLOSED;
notifyChannelClosed(this); notifyChannelClosed(this);
} }


@Override @Override
public Object down(Event evt) { public Object down(Message msg) {
if(evt.getType() == Event.MSG) setHeader(msg);
setHeader(evt.getArg()); return super.down(msg);
return super.down(evt);
} }


@Override @Override
Expand All @@ -220,7 +219,7 @@ public ForkChannel send(Message msg) throws Exception {
hdr=new FORK.ForkHeader(null, fork_channel_id); hdr=new FORK.ForkHeader(null, fork_channel_id);
msg.putHeader(FORK.ID, hdr); msg.putHeader(FORK.ID, hdr);
} }
prot_stack.down(new Event(Event.MSG, msg)); prot_stack.down(msg);
return this; return this;
} }


Expand Down
17 changes: 9 additions & 8 deletions src/org/jgroups/fork/ForkProtocol.java
Expand Up @@ -27,14 +27,6 @@ public Map<String,Object> dumpStats() {


public Object down(Event evt) { public Object down(Event evt) {
switch(evt.getType()) { switch(evt.getType()) {
case Event.MSG:
Message msg=(Message)evt.getArg();
FORK.ForkHeader hdr=(FORK.ForkHeader)msg.getHeader(FORK.ID);
if(hdr == null)
msg.putHeader(FORK.ID, hdr=new FORK.ForkHeader(fork_stack_id, null));
else
hdr.setForkStackId(fork_stack_id);
break;
case Event.SET_LOCAL_ADDRESS: case Event.SET_LOCAL_ADDRESS:
case Event.VIEW_CHANGE: case Event.VIEW_CHANGE:
case Event.CONNECT: case Event.CONNECT:
Expand All @@ -46,4 +38,13 @@ public Object down(Event evt) {
} }
return down_prot.down(evt); return down_prot.down(evt);
} }

public Object down(Message msg) {
FORK.ForkHeader hdr=msg.getHeader(FORK.ID);
if(hdr == null)
msg.putHeader(FORK.ID, hdr=new FORK.ForkHeader(fork_stack_id, null));
else
hdr.setForkStackId(fork_stack_id);
return down_prot.down(msg);
}
} }

0 comments on commit 47bd9b7

Please sign in to comment.