Skip to content

Commit

Permalink
ns
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Feb 16, 2011
1 parent 0cbdfdc commit 6f01d91
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions src/org/jgroups/protocols/RELAY.java
Expand Up @@ -123,7 +123,11 @@ public Object down(Event evt) {


case Event.MSG: case Event.MSG:
Message msg=(Message)evt.getArg(); Message msg=(Message)evt.getArg();
if(msg.getDest() instanceof ProxyAddress) { Address dest=msg.getDest();

// forward non local destinations to the coordinator, to relay to the remote cluster
// if(dest instanceof ProxyAddress || !local_view.containsMember(dest)) {
if(remote_view != null && remote_view.containsMember(dest)) {
forwardToCoord(msg); forwardToCoord(msg);
return null; return null;
} }
Expand All @@ -133,18 +137,6 @@ public Object down(Event evt) {
handleView((View)evt.getArg()); handleView((View)evt.getArg());
break; break;


case Event.CONNECT:
case Event.CONNECT_USE_FLUSH:
case Event.CONNECT_WITH_STATE_TRANSFER:
case Event.CONNECT_WITH_STATE_TRANSFER_USE_FLUSH:
Object retval=down_prot.down(evt);
if(coord != null) {
Message broadcast_view_req=new Message(coord, null, null);
broadcast_view_req.putHeader(id, RelayHeader.create(RelayHeader.Type.BROADCAST_VIEW));
down_prot.down(new Event(Event.MSG, broadcast_view_req));
}
return retval;

case Event.DISCONNECT: case Event.DISCONNECT:
Util.close(bridge); Util.close(bridge);
break; break;
Expand Down Expand Up @@ -245,8 +237,6 @@ protected void handleView(final View view) {




if(is_coord) { if(is_coord) {
sendViewOnLocalCluster(null, remote_view, generateGlobalView(view, remote_view), true);

if(create_bridge) { if(create_bridge) {
createBridge(); createBridge();
Message msg=new Message(); Message msg=new Message();
Expand All @@ -259,6 +249,13 @@ protected void handleView(final View view) {
} }


sendViewToRemote(ViewData.create(view, null), false); sendViewToRemote(ViewData.create(view, null), false);

if(create_bridge && bridge.getView().size() > 1) {
;
}
else {
sendViewOnLocalCluster(null, remote_view, generateGlobalView(view, remote_view), true);
}
} }
} }


Expand Down Expand Up @@ -306,8 +303,13 @@ protected void forwardToCoord(Message msg) {
Message tmp=msg.copy(true, Global.BLOCKS_START_ID); // // we only copy headers from building blocks Message tmp=msg.copy(true, Global.BLOCKS_START_ID); // // we only copy headers from building blocks
if(tmp.getSrc() == null) if(tmp.getSrc() == null)
tmp.setSrc(local_addr); tmp.setSrc(local_addr);
ProxyAddress dst=(ProxyAddress)tmp.getDest();
tmp.setDest(dst.getOriginalAddress()); Address dest=tmp.getDest();
if(dest instanceof ProxyAddress) {
ProxyAddress dst=(ProxyAddress)tmp.getDest();
tmp.setDest(dst.getOriginalAddress());
}

try { try {
byte[] buf=Util.streamableToByteBuffer(tmp); byte[] buf=Util.streamableToByteBuffer(tmp);
if(coord != null) { if(coord != null) {
Expand All @@ -325,10 +327,10 @@ protected void forwardToCoord(Message msg) {


protected void sendViewToRemote(ViewData view_data, boolean use_seperate_thread) { protected void sendViewToRemote(ViewData view_data, boolean use_seperate_thread) {
try { try {
byte[] buf=Util.streamableToByteBuffer(view_data);
final Message msg=new Message(null, null, buf);
msg.putHeader(id, RelayHeader.create(RelayHeader.Type.VIEW));
if(bridge != null && bridge.isConnected()) { if(bridge != null && bridge.isConnected()) {
byte[] buf=Util.streamableToByteBuffer(view_data);
final Message msg=new Message(null, null, buf);
msg.putHeader(id, RelayHeader.create(RelayHeader.Type.VIEW));
if(use_seperate_thread) { if(use_seperate_thread) {
timer.execute(new Runnable() { timer.execute(new Runnable() {
public void run() { public void run() {
Expand Down

0 comments on commit 6f01d91

Please sign in to comment.