Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Mar 5, 2010
1 parent 5e5d496 commit a779288
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 13 deletions.
14 changes: 8 additions & 6 deletions src/org/jgroups/mux/Multiplexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.jgroups.logging.LogFactory;
import org.jgroups.*;
import org.jgroups.TimeoutException;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.annotations.Experimental;
import org.jgroups.protocols.pbcast.FLUSH;
import org.jgroups.stack.StateTransferInfo;
Expand Down Expand Up @@ -36,7 +37,7 @@
* @author Bela Ban, Vladimir Blagojevic
* @see MuxChannel
* @see Channel
* @version $Id: Multiplexer.java,v 1.112 2009/10/14 09:41:23 belaban Exp $
* @version $Id: Multiplexer.java,v 1.113 2010/03/05 08:59:12 belaban Exp $
*/
@Experimental(comment="because of impedance mismatches between a MuxChannel and JChannel, this might get deprecated " +
"in the future. The replacement would be a shared transport (see the documentation for details)")
Expand All @@ -46,7 +47,7 @@ public class Multiplexer implements UpHandler {
private static final Log log=LogFactory.getLog(Multiplexer.class);
private static final String SEPARATOR="::";
private static final short SEPARATOR_LEN=(short)SEPARATOR.length();
private static final String NAME="MUX";
private static final short ID=ClassConfigurator.getProtocolId(Multiplexer.class);

/**
* Map<String,MuxChannel>. Maintains the mapping between service IDs and
Expand Down Expand Up @@ -290,7 +291,7 @@ public Object up(final Event evt) {
switch(evt.getType()) {
case Event.MSG:
final Message msg=(Message)evt.getArg();
final MuxHeader hdr=(MuxHeader)msg.getHeader(NAME);
final MuxHeader hdr=(MuxHeader)msg.getHeader(ID);
if(hdr == null) {
log.error("MuxHeader not present - discarding message " + msg);
return null;
Expand Down Expand Up @@ -620,13 +621,14 @@ private void sendServiceMessage(boolean synchronous,
}

Message service_msg=new Message();
service_msg.putHeader(NAME, new MuxHeader(new ServiceInfo(type, service, host, payload)));
service_msg.putHeader(ID, new MuxHeader(new ServiceInfo(type, service, host, payload)));

if(oob)
service_msg.setFlag(Message.OOB);

if(channel.flushSupported())
service_msg.putHeader("FLUSH", new FLUSH.FlushHeader(FLUSH.FlushHeader.FLUSH_BYPASS));
service_msg.putHeader(ClassConfigurator.getProtocolId(FLUSH.class),
new FLUSH.FlushHeader(FLUSH.FlushHeader.FLUSH_BYPASS));

if(synchronous) {
//for synchronous invocation we need to collect acks
Expand Down Expand Up @@ -814,7 +816,7 @@ private void ackServiceMessage(ServiceInfo info, Address ackTarget) throws Chann

ServiceInfo si=new ServiceInfo(ServiceInfo.ACK, info.service, info.host, null);
MuxHeader hdr=new MuxHeader(si);
ack.putHeader(NAME, hdr);
ack.putHeader(ID, hdr);

if(channel.isConnected())
channel.send(ack);
Expand Down
11 changes: 6 additions & 5 deletions src/org/jgroups/mux/MuxChannel.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.jgroups.mux;

import org.jgroups.*;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.annotations.Experimental;
import org.jgroups.stack.ProtocolStack;

Expand All @@ -27,7 +28,7 @@
* @see JChannelFactory#createMultiplexerChannel(String, String)
* @see Multiplexer
* @since 2.4
* @version $Id: MuxChannel.java,v 1.56 2009/06/17 16:20:10 belaban Exp $
* @version $Id: MuxChannel.java,v 1.57 2010/03/05 08:59:12 belaban Exp $
*/
@Experimental(comment="because of impedance mismatches between a MuxChannel and JChannel, this might get deprecated " +
"in the future. The replacement would be a shared transport (see the documentation for details)")
Expand All @@ -37,7 +38,7 @@ public class MuxChannel extends JChannel {
/*
* Header identifier
*/
private static final String name="MUX";
private static final short ID=ClassConfigurator.getProtocolId(MuxChannel.class);

/*
* MuxChannel service ID
Expand Down Expand Up @@ -344,7 +345,7 @@ public synchronized void shutdown() {
}

public void send(Message msg) throws ChannelNotConnectedException,ChannelClosedException {
msg.putHeader(name, hdr);
msg.putHeader(ID, hdr);
mux.getChannel().send(msg);
if(stats) {
sent_msgs++;
Expand All @@ -360,15 +361,15 @@ public void send(Address dst, Address src, Serializable obj) throws ChannelNotCo
public void down(Event evt) {
if(evt.getType() == Event.MSG) {
Message msg=(Message)evt.getArg();
msg.putHeader(name, hdr);
msg.putHeader(ID, hdr);
}
mux.getChannel().down(evt);
}

public Object downcall(Event evt) {
if(evt.getType() == Event.MSG) {
Message msg=(Message)evt.getArg();
msg.putHeader(name, hdr);
msg.putHeader(ID, hdr);
}
return mux.getChannel().downcall(evt);
}
Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/stack/ProtocolStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* stacks, and to destroy them again when not needed anymore
*
* @author Bela Ban
* @version $Id: ProtocolStack.java,v 1.97 2009/12/11 13:19:49 belaban Exp $
* @version $Id: ProtocolStack.java,v 1.98 2010/03/05 08:59:43 belaban Exp $
*/
public class ProtocolStack extends Protocol implements Transport {
public static final int ABOVE = 1; // used by insertProtocol()
Expand Down Expand Up @@ -718,7 +718,7 @@ public void startStack(String cluster_name, Address local_addr) throws Exception
}

if(above_prot != null) {
TP.ProtocolAdapter ad=new TP.ProtocolAdapter(cluster_name, local_addr, prot.getName(),
TP.ProtocolAdapter ad=new TP.ProtocolAdapter(cluster_name, local_addr, prot.getId(),
above_prot, prot,
transport.getThreadNamingPattern());
ad.setProtocolStack(above_prot.getProtocolStack());
Expand Down

0 comments on commit a779288

Please sign in to comment.