Skip to content

Commit

Permalink
returning the entire cache in TCPPING when return_entire_cache="true" (
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Mar 6, 2009
1 parent 11dba4f commit ebe04a1
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 20 deletions.
10 changes: 6 additions & 4 deletions src/org/jgroups/Event.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// $Id: Event.java,v 1.63.4.6 2009/02/26 15:46:02 belaban Exp $
// $Id: Event.java,v 1.63.4.7 2009/03/06 12:53:30 belaban Exp $

package org.jgroups;

Expand Down Expand Up @@ -49,9 +49,10 @@ public class Event {
public static final int DISABLE_UNICASTS_TO = 81; // arg = Address (member)
public static final int PREPARE_VIEW = 86; // arg = View
public static final int GET_PHYSICAL_ADDRESS = 87; // arg = Address --> PhysicalAddress
public static final int SET_PHYSICAL_ADDRESS = 88; // arg = Tuple<Address,PhysicalAddress>
public static final int REMOVE_ADDRESS = 89; // arg = Address
public static final int GET_LOCAL_ADDRESS = 90; // arg = null --> UUID (local_addr)
public static final int GET_LOGICAL_PHYSICAL_MAPPINGS = 88; // arg = null --> Map<Address,PhysicalAddress>
public static final int SET_PHYSICAL_ADDRESS = 89; // arg = Tuple<Address,PhysicalAddress>
public static final int REMOVE_ADDRESS = 90; // arg = Address
public static final int GET_LOCAL_ADDRESS = 91; // arg = null --> UUID (local_addr)

public static final int USER_DEFINED = 1000; // arg = <user def., e.g. evt type + data>

Expand Down Expand Up @@ -135,6 +136,7 @@ public static String type2String(int t) {
case DISABLE_UNICASTS_TO: return "DISABLE_UNICASTS_TO";
case PREPARE_VIEW: return "PREPARE_VIEW";
case GET_PHYSICAL_ADDRESS: return "GET_PHYSICAL_ADDRESS";
case GET_LOGICAL_PHYSICAL_MAPPINGS: return "GET_LOGICAL_PHYSICAL_MAPPINGS";
case SET_PHYSICAL_ADDRESS: return "SET_PHYSICAL_ADDRESS";
case REMOVE_ADDRESS: return "REMOVE_ADDRESS";
case GET_LOCAL_ADDRESS: return "GET_LOCAL_ADDRESS";
Expand Down
52 changes: 38 additions & 14 deletions src/org/jgroups/protocols/Discovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* </ul>
*
* @author Bela Ban
* @version $Id: Discovery.java,v 1.52.4.8 2009/02/27 12:43:20 belaban Exp $
* @version $Id: Discovery.java,v 1.52.4.9 2009/03/06 12:53:10 belaban Exp $
*/
@MBean
public abstract class Discovery extends Protocol {
Expand Down Expand Up @@ -68,6 +68,10 @@ public abstract class Discovery extends Protocol {
@ManagedAttribute(description="Number of discovery requests to be sent (min=1), " + "distributed over timeout ms", writable=true)
int num_ping_requests=2;

@Property(description="Whether or not to return the entire logical-physical address cache mappings on a " +
"discovery request, or not. Default is false, except for TCPPING")
boolean return_entire_cache=false;


/* --------------------------------------------- JMX ------------------------------------------------------ */

Expand Down Expand Up @@ -271,16 +275,23 @@ public Object up(Event evt) {
coord=!members.isEmpty()? members.firstElement() : local_addr;
}

PhysicalAddress physical_addr=(PhysicalAddress)down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
PingData ping_rsp=new PingData(local_addr, coord, is_server, org.jgroups.util.UUID.get(local_addr),
Arrays.asList(physical_addr));
rsp_msg=new Message(msg.getSrc(), null, null);
rsp_msg.setFlag(Message.OOB);
rsp_hdr=new PingHeader(PingHeader.GET_MBRS_RSP, ping_rsp);
rsp_msg.putHeader(getName(), rsp_hdr);
if(log.isTraceEnabled())
log.trace("received GET_MBRS_REQ from " + msg.getSrc() + ", sending response " + rsp_hdr);
down_prot.down(new Event(Event.MSG, rsp_msg));
if(return_entire_cache) {
Map<Address,PhysicalAddress> cache=(Map<Address,PhysicalAddress>)down(new Event(Event.GET_LOGICAL_PHYSICAL_MAPPINGS));
if(cache != null) {
Address src=msg.getSrc();
for(Map.Entry<Address,PhysicalAddress> entry: cache.entrySet()) {
Address logical_addr=entry.getKey();
PhysicalAddress physical_addr=entry.getValue();
sendDiscoveryResponse(logical_addr, Arrays.asList(physical_addr), coord, is_server,
UUID.get(logical_addr), src);
}
}
}
else {
PhysicalAddress physical_addr=(PhysicalAddress)down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
sendDiscoveryResponse(local_addr, Arrays.asList(physical_addr), coord, is_server,
org.jgroups.util.UUID.get(local_addr), msg.getSrc());
}
return null;

case PingHeader.GET_MBRS_RSP: // add response to vector and notify waiting thread
Expand All @@ -295,7 +306,8 @@ public Object up(Event evt) {
if(logical_addr == null)
logical_addr=msg.getSrc();
List<PhysicalAddress> physical_addrs=rsp.getPhysicalAddrs();
physical_addr=physical_addrs != null && !physical_addrs.isEmpty()? physical_addrs.get(0) : null;
PhysicalAddress physical_addr=physical_addrs != null && !physical_addrs.isEmpty()?
physical_addrs.get(0) : null;
if(logical_addr != null && physical_addr != null)
down(new Event(Event.SET_PHYSICAL_ADDRESS, new Tuple<Address,PhysicalAddress>(logical_addr, physical_addr)));
if(logical_addr != null && rsp.getLogicalName() != null)
Expand Down Expand Up @@ -401,10 +413,22 @@ protected final View makeView(Vector mbrs) {
return new View(coord, id, mbrs);
}


private void sendDiscoveryResponse(Address logical_addr, List<PhysicalAddress> physical_addrs,
Address coord, boolean is_server, String logical_name, Address sender) {
PingData ping_rsp=new PingData(logical_addr, coord, is_server, logical_name, physical_addrs);
Message rsp_msg=new Message(sender, null, null);
rsp_msg.setFlag(Message.OOB);
PingHeader rsp_hdr=new PingHeader(PingHeader.GET_MBRS_RSP, ping_rsp);
rsp_msg.putHeader(getName(), rsp_hdr);
if(log.isTraceEnabled())
log.trace("received GET_MBRS_REQ from " + sender + ", sending response " + rsp_hdr);
down_prot.down(new Event(Event.MSG, rsp_msg));
}


class PingSenderTask {
private Future<?> senderFuture;
private Future<?> senderFuture;

public PingSenderTask() {}

Expand Down Expand Up @@ -436,7 +460,7 @@ public synchronized void stop() {

private static class Responses {
final Promise<JoinRsp> promise;
final List<PingData> ping_rsps=new LinkedList<PingData>();
final List<PingData> ping_rsps=new LinkedList<PingData>();
final int num_expected_rsps;
final int num_expected_srv_rsps;
final boolean break_on_coord_rsp;
Expand Down
5 changes: 3 additions & 2 deletions src/org/jgroups/protocols/TCPPING.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* membership.
*
* @author Bela Ban
* @version $Id: TCPPING.java,v 1.41.4.1 2009/02/27 12:43:21 belaban Exp $
* @version $Id: TCPPING.java,v 1.41.4.2 2009/03/06 12:53:43 belaban Exp $
*/
public class TCPPING extends Discovery {

Expand All @@ -56,7 +56,8 @@ public class TCPPING extends Discovery {
private List<IpAddress> initial_hosts;


public TCPPING() {
public TCPPING() {
return_entire_cache=true;
}

public String getName() {
Expand Down

0 comments on commit ebe04a1

Please sign in to comment.