Skip to content

Commit

Permalink
- Added Discovery.max_rank_to_reply: if TP.use_ip_addrs is true, then…
Browse files Browse the repository at this point in the history
… only members with a rank smaller than max_rank_to_reply respond to initial membership discovery requests; others drop it (https://issues.jboss.org/browse/JGRP-2098)

- Added field only_coords_send_discovery_rsps to Discovery
- Added View.newMembers(View old, View new_view)
- Added initial_discovery to PingHeader to let responders decide whether or not to reply
- Added View.sameViews() plus unit test
- Merger now rejects a merge if all views passed to it are equal (same ViewId)
  • Loading branch information
belaban committed Sep 29, 2016
1 parent bcacbdc commit 3274627
Show file tree
Hide file tree
Showing 15 changed files with 182 additions and 161 deletions.
6 changes: 3 additions & 3 deletions conf/tcp-nio.xml
Expand Up @@ -14,9 +14,9 @@
max_bundle_size="64K" max_bundle_size="64K"
sock_conn_timeout="300" sock_conn_timeout="300"


thread_pool.min_threads="2" thread_pool.min_threads="0"
thread_pool.max_threads="8" thread_pool.max_threads="20"
thread_pool.keep_alive_time="5000"/> thread_pool.keep_alive_time="30000"/>


<TCPPING async_discovery="true" <TCPPING async_discovery="true"
initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7800],localhost[7801]}" initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7800],localhost[7801]}"
Expand Down
6 changes: 3 additions & 3 deletions conf/tcp.xml
Expand Up @@ -14,9 +14,9 @@
max_bundle_size="64K" max_bundle_size="64K"
sock_conn_timeout="300" sock_conn_timeout="300"


thread_pool.min_threads="2" thread_pool.min_threads="0"
thread_pool.max_threads="8" thread_pool.max_threads="20"
thread_pool.keep_alive_time="5000"/> thread_pool.keep_alive_time="30000"/>


<TCPPING async_discovery="true" <TCPPING async_discovery="true"
initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7800],localhost[7801]}" initial_hosts="${jgroups.tcpping.initial_hosts:localhost[7800],localhost[7801]}"
Expand Down
6 changes: 3 additions & 3 deletions conf/udp.xml
Expand Up @@ -20,9 +20,9 @@
enable_diagnostics="true" enable_diagnostics="true"
thread_naming_pattern="cl" thread_naming_pattern="cl"


thread_pool.min_threads="2" thread_pool.min_threads="0"
thread_pool.max_threads="8" thread_pool.max_threads="20"
thread_pool.keep_alive_time="5000"/> thread_pool.keep_alive_time="30000"/>


<PING /> <PING />
<MERGE3 max_interval="30000" <MERGE3 max_interval="30000"
Expand Down
20 changes: 20 additions & 0 deletions src/org/jgroups/View.java
Expand Up @@ -11,6 +11,7 @@
import java.io.DataOutput; import java.io.DataOutput;
import java.util.*; import java.util.*;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Stream;


/** /**
* A view is a local representation of the current membership of a group. Only one view is installed * A view is a local representation of the current membership of a group. Only one view is installed
Expand Down Expand Up @@ -221,6 +222,14 @@ public static List<Address> leftMembers(View one, View two) {
return retval; return retval;
} }


public static List<Address> newMembers(View old, View new_view) {
if(old == null || new_view == null)
return null;
List<Address> retval=new ArrayList<>(new_view.getMembers());
retval.removeAll(old.getMembers());
return retval;
}

/** /**
* Returns the difference between 2 views from and to. It is assumed that view 'from' is logically prior to view 'to'. * Returns the difference between 2 views from and to. It is assumed that view 'from' is logically prior to view 'to'.
* @param from The first view * @param from The first view
Expand Down Expand Up @@ -268,6 +277,17 @@ public static Address[][] diff(final View from, final View to) {
return new Address[][]{joined != null? joined : new Address[]{}, left != null? left : new Address[]{}}; return new Address[][]{joined != null? joined : new Address[]{}, left != null? left : new Address[]{}};
} }


/** Returns true if all views are the same. Uses the view IDs for comparison */
public static boolean sameViews(View ... views) {
ViewId first_view_id=views[0].getViewId();
return Stream.of(views).allMatch(v -> v.getViewId().equals(first_view_id));
}

public static boolean sameViews(Collection<View> views) {
ViewId first_view_id=views.iterator().next().getViewId();
return views.stream().allMatch(v -> v.getViewId().equals(first_view_id));
}

public Iterator<Address> iterator() { public Iterator<Address> iterator() {
return new ArrayIterator(this.members); return new ArrayIterator(this.members);
} }
Expand Down
84 changes: 33 additions & 51 deletions src/org/jgroups/protocols/Discovery.java
Expand Up @@ -40,20 +40,17 @@ public abstract class Discovery extends Protocol {


@Property(description="Whether or not to return the entire logical-physical address cache mappings on a " + @Property(description="Whether or not to return the entire logical-physical address cache mappings on a " +
"discovery request, or not.") "discovery request, or not.")
protected boolean return_entire_cache=false; protected boolean return_entire_cache;


@Property(description="If greater than 0, we'll wait a random number of milliseconds in range [0..stagger_timeout] " + @Property(description="If greater than 0, we'll wait a random number of milliseconds in range [0..stagger_timeout] " +
"before sending a discovery response. This prevents traffic spikes in large clusters when everyone sends their " + "before sending a discovery response. This prevents traffic spikes in large clusters when everyone sends their " +
"discovery response at the same time") "discovery response at the same time")
protected long stagger_timeout; protected long stagger_timeout;


@Property(description="Always sends a discovery response, no matter what")
protected boolean force_sending_discovery_rsps=true;



@Property(description="If a persistent disk cache (PDC) is present, combine the discovery results with the " + @Property(description="If a persistent disk cache (PDC) is present, combine the discovery results with the " +
"contents of the disk cache before returning the results") "contents of the disk cache before returning the results")
protected boolean use_disk_cache=false; protected boolean use_disk_cache;


@Property(description="Max size of the member list shipped with a discovery request. If we have more, the " + @Property(description="Max size of the member list shipped with a discovery request. If we have more, the " +
"mbrs field in the discovery request header is nulled and members return the entire membership, " + "mbrs field in the discovery request header is nulled and members return the entire membership, " +
Expand All @@ -65,57 +62,57 @@ public abstract class Discovery extends Protocol {


@Property(description="If true then the discovery is done on a separate timer thread. Should be set to true when " + @Property(description="If true then the discovery is done on a separate timer thread. Should be set to true when " +
"discovery is blocking and/or takes more than a few milliseconds") "discovery is blocking and/or takes more than a few milliseconds")
protected boolean async_discovery=false; protected boolean async_discovery;


@Property(description="If enabled, use a separate thread for every discovery request. Can be used with or without " + @Property(description="If enabled, use a separate thread for every discovery request. Can be used with or without " +
"async_discovery") "async_discovery")
protected boolean async_discovery_use_separate_thread_per_request; protected boolean async_discovery_use_separate_thread_per_request;


@Property(description="When a new node joins, and we have a static discovery protocol (TCPPING), then send the " + @Property(description="When a new node joins, and we have a static discovery protocol (TCPPING), then send the " +
"contents of the discovery cache to new and existing members if true (and we're the coord). Addresses JGRP-1903") "contents of the discovery cache to new and existing members if true (and we're the coord). Addresses JGRP-1903")
protected boolean send_cache_on_join=false; protected boolean send_cache_on_join;



@ManagedOperation(description="Sets force_sending_discovery_rsps") @Property(description="The max rank of this member to respond to discovery requests, e.g. if " +
public void setForceSendingDiscoveryRsps(boolean flag) { "max_rank_to_reply=2 in {A,B,C,D,E}, only A (rank 1) and B (rank 2) will reply. A value <= 0 means " +
force_sending_discovery_rsps=flag; "everybody will reply. This attribute is ignored if TP.use_ip_addrs is false.")
} protected int max_rank_to_reply;


/* --------------------------------------------- JMX ------------------------------------------------------ */ /* --------------------------------------------- JMX ------------------------------------------------------ */



@ManagedAttribute(description="Total number of discovery requests sent ") @ManagedAttribute(description="Total number of discovery requests sent ")
protected int num_discovery_requests; protected int num_discovery_requests;


/* --------------------------------------------- Fields ------------------------------------------------------ */ /* --------------------------------------------- Fields ------------------------------------------------------ */


protected volatile boolean is_server=false; protected volatile boolean is_server;
protected volatile boolean is_leaving=false; protected volatile boolean is_leaving;
protected TimeScheduler timer; protected TimeScheduler timer;
protected View view; protected volatile View view;
protected final List<Address> members=new ArrayList<>(11);
@ManagedAttribute(description="Whether this member is the current coordinator") @ManagedAttribute(description="Whether this member is the current coordinator")
protected boolean is_coord=false; protected volatile boolean is_coord;
protected Address local_addr=null; protected volatile Address local_addr;
protected Address current_coord; protected volatile Address current_coord;
protected String cluster_name; protected String cluster_name;
protected final Map<Long,Responses> ping_responses=new HashMap<>(); protected final Map<Long,Responses> ping_responses=new HashMap<>();
@ManagedAttribute(description="Whether the transport supports multicasting") @ManagedAttribute(description="Whether the transport supports multicasting")
protected boolean transport_supports_multicasting=true; protected boolean transport_supports_multicasting=true;
protected boolean use_ip_addrs; // caches TP.use_ip_addrs
@ManagedAttribute(description="True if sending a message can block at the transport level") @ManagedAttribute(description="True if sending a message can block at the transport level")
protected boolean sends_can_block=true; protected boolean sends_can_block=true;
protected static final byte[] WHITESPACE=" \t".getBytes(); protected static final byte[] WHITESPACE=" \t".getBytes();






public void init() throws Exception { public void init() throws Exception {
timer=getTransport().getTimer(); TP tp=getTransport();
timer=tp.getTimer();
if(timer == null) if(timer == null)
throw new Exception("timer cannot be retrieved from protocol stack"); throw new Exception("timer cannot be retrieved from protocol stack");
if(stagger_timeout < 0) if(stagger_timeout < 0)
throw new IllegalArgumentException("stagger_timeout cannot be negative"); throw new IllegalArgumentException("stagger_timeout cannot be negative");
transport_supports_multicasting=getTransport().supportsMulticasting(); transport_supports_multicasting=tp.supportsMulticasting();
sends_can_block=getTransport() instanceof TCP; // UDP and TCP_NIO2 won't block sends_can_block=getTransport() instanceof TCP; // UDP and TCP_NIO2 won't block
use_ip_addrs=tp.getUseIpAddresses();
} }


public abstract boolean isDynamic(); public abstract boolean isDynamic();
Expand All @@ -138,8 +135,6 @@ public void discoveryRequestReceived(Address sender, String logical_name, Physic
public Discovery returnEntireCache(boolean flag) {return_entire_cache=flag; return this;} public Discovery returnEntireCache(boolean flag) {return_entire_cache=flag; return this;}
public long staggerTimeout() {return stagger_timeout;} public long staggerTimeout() {return stagger_timeout;}
public Discovery staggerTimeout(long timeout) {stagger_timeout=timeout; return this;} public Discovery staggerTimeout(long timeout) {stagger_timeout=timeout; return this;}
public boolean forceDiscoveryResponses() {return force_sending_discovery_rsps;}
public Discovery forceDiscoveryResponses(boolean f) {force_sending_discovery_rsps=f; return this;}
public boolean useDiskCache() {return use_disk_cache;} public boolean useDiskCache() {return use_disk_cache;}
public Discovery useDiskCache(boolean flag) {use_disk_cache=flag; return this;} public Discovery useDiskCache(boolean flag) {use_disk_cache=flag; return this;}
public Discovery discoveryRspExpiryTime(long t) {this.discovery_rsp_expiry_time=t; return this;} public Discovery discoveryRspExpiryTime(long t) {this.discovery_rsp_expiry_time=t; return this;}
Expand All @@ -163,10 +158,7 @@ protected boolean isMergeRunning() {


@ManagedOperation(description="Sends information about my cache to everyone but myself") @ManagedOperation(description="Sends information about my cache to everyone but myself")
public void sendCacheInformation() { public void sendCacheInformation() {
List<Address> current_members=null; List<Address> current_members=new ArrayList<>(view.getMembers());
synchronized(members) {
current_members=new ArrayList<>(members);
}
disseminateDiscoveryInformation(current_members, null, current_members); disseminateDiscoveryInformation(current_members, null, current_members);
} }


Expand Down Expand Up @@ -292,7 +284,7 @@ public Object up(Message msg) {
} }


// add physical address and logical name of the discovery sender (if available) to the cache // add physical address and logical name of the discovery sender (if available) to the cache
if(data != null) { if(data != null) { // null if use_ip_addrs is true
addDiscoveryResponseToCaches(logical_addr, data.getLogicalName(), data.getPhysicalAddr()); addDiscoveryResponseToCaches(logical_addr, data.getLogicalName(), data.getPhysicalAddr());
discoveryRequestReceived(msg.getSrc(), data.getLogicalName(), data.getPhysicalAddr()); discoveryRequestReceived(msg.getSrc(), data.getLogicalName(), data.getPhysicalAddr());
addResponse(data, false); addResponse(data, false);
Expand All @@ -304,9 +296,9 @@ public Object up(Message msg) {
for(Map.Entry<Address,PhysicalAddress> entry: cache.entrySet()) { for(Map.Entry<Address,PhysicalAddress> entry: cache.entrySet()) {
Address addr=entry.getKey(); Address addr=entry.getKey();
// JGRP-1492: only return our own address, and addresses in view. // JGRP-1492: only return our own address, and addresses in view.
if(addr.equals(local_addr) || members.contains(addr)) { if(addr.equals(local_addr) || view.containsMember(addr)) {
PhysicalAddress physical_addr=entry.getValue(); PhysicalAddress physical_addr=entry.getValue();
sendDiscoveryResponse(addr, physical_addr, NameCache.get(addr), msg.getSrc(), isCoord(addr)); sendDiscoveryResponse(addr, physical_addr, NameCache.get(addr), msg.getSrc(), is_coord);

This comment has been minimized.

Copy link
@mkobayas

mkobayas May 30, 2017

Why did you change isCoord(addr) to is_coord ?
If this node is a coordinator, this change will cause all PingData responses to be recognized as Coord and return_entire_cache will not work properly.

This comment has been minimized.

Copy link
@belaban

belaban Jun 6, 2017

Author Owner

You're right, this is a regression. I created https://issues.jboss.org/browse/JGRP-2192 to fix it.

This comment has been minimized.

Copy link
@belaban

belaban Jun 6, 2017

Author Owner

Fixed and committed to master (will be 4.0.4)

} }
} }
} }
Expand All @@ -315,11 +307,11 @@ public Object up(Message msg) {


// Only send a response if hdr.mbrs is not empty and contains myself. Otherwise always send my info // Only send a response if hdr.mbrs is not empty and contains myself. Otherwise always send my info
Collection<? extends Address> mbrs=data != null? data.mbrs() : null; Collection<? extends Address> mbrs=data != null? data.mbrs() : null;
boolean send_response=mbrs == null || mbrs.contains(local_addr); boolean drop_because_of_rank=use_ip_addrs && max_rank_to_reply > 0 && hdr.initialDiscovery() && Util.getRank(view, local_addr) > max_rank_to_reply;
if(send_response) { if(drop_because_of_rank || (mbrs != null && !mbrs.contains(local_addr)))
PhysicalAddress physical_addr=(PhysicalAddress)down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr)); return null;
sendDiscoveryResponse(local_addr, physical_addr, NameCache.get(local_addr), msg.getSrc(), is_coord); PhysicalAddress physical_addr=(PhysicalAddress)down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
} sendDiscoveryResponse(local_addr, physical_addr, NameCache.get(local_addr), msg.getSrc(), is_coord);
return null; return null;


case PingHeader.GET_MBRS_RSP: case PingHeader.GET_MBRS_RSP:
Expand Down Expand Up @@ -353,29 +345,19 @@ public Object down(Event evt) {
return findMembers(null, true, false); // triggered by JOIN process (ClientGmsImpl) return findMembers(null, true, false); // triggered by JOIN process (ClientGmsImpl)


case Event.FIND_MBRS: case Event.FIND_MBRS:
return findMembers(evt.getArg(), false, false); // triggered by MERGE2/MERGE3 return findMembers(evt.getArg(), false, false); // triggered by MERGE3


// case Event.TMP_VIEW:
case Event.VIEW_CHANGE: case Event.VIEW_CHANGE:
List<Address> tmp;
View old_view=view; View old_view=view;
view=evt.getArg(); view=evt.getArg();
if((tmp=view.getMembers()) != null) { current_coord=view.getCoord();
synchronized(members) {
members.clear();
members.addAll(tmp);
}
}
current_coord=!members.isEmpty()? members.get(0) : null;
is_coord=current_coord != null && local_addr != null && current_coord.equals(local_addr); is_coord=current_coord != null && local_addr != null && current_coord.equals(local_addr);
Object retval=down_prot.down(evt); Object retval=down_prot.down(evt);
if(send_cache_on_join && !isDynamic() && is_coord) { if(send_cache_on_join && !isDynamic() && is_coord) {
List<Address> curr_mbrs, left_mbrs, new_mbrs; List<Address> curr_mbrs, left_mbrs, new_mbrs;
synchronized(members) { curr_mbrs=new ArrayList<>(view.getMembers());
curr_mbrs=new ArrayList<>(members); left_mbrs=View.leftMembers(old_view, view);
left_mbrs=old_view != null? Util.leftMembers(old_view.getMembers(), members) : null; new_mbrs=View.newMembers(old_view, view);
new_mbrs=old_view != null? Util.newMembers(old_view.getMembers(), members) : null;
}
startCacheDissemination(curr_mbrs, left_mbrs, new_mbrs); // separate task startCacheDissemination(curr_mbrs, left_mbrs, new_mbrs); // separate task
} }
return retval; return retval;
Expand Down
23 changes: 14 additions & 9 deletions src/org/jgroups/protocols/PING.java
Expand Up @@ -29,7 +29,7 @@ public boolean isDynamic() {


public void findMembers(List<Address> members, boolean initial_discovery, Responses responses) { public void findMembers(List<Address> members, boolean initial_discovery, Responses responses) {
try { try {
sendDiscoveryRequest(cluster_name, members); sendDiscoveryRequest(cluster_name, members, initial_discovery);
} }
catch(InterruptedIOException | InterruptedException ie) { catch(InterruptedIOException | InterruptedException ie) {
; ;
Expand All @@ -40,22 +40,27 @@ public void findMembers(List<Address> members, boolean initial_discovery, Respon
} }




protected void sendDiscoveryRequest(String cluster_name, List<Address> members_to_find) throws Exception { protected void sendDiscoveryRequest(String cluster_name, List<Address> members_to_find, boolean initial_discovery) throws Exception {
PhysicalAddress physical_addr=(PhysicalAddress)down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr)); PingData data=null;


// https://issues.jboss.org/browse/JGRP-1670 if(!use_ip_addrs || !initial_discovery) {
PingData data=new PingData(local_addr, false, NameCache.get(local_addr), physical_addr); PhysicalAddress physical_addr=(PhysicalAddress)down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
PingHeader hdr=new PingHeader(PingHeader.GET_MBRS_REQ).clusterName(cluster_name);


if(members_to_find != null && members_to_find.size() <= max_members_in_discovery_request) // https://issues.jboss.org/browse/JGRP-1670
data.mbrs(members_to_find); data=new PingData(local_addr, false, NameCache.get(local_addr), physical_addr);
if(members_to_find != null && members_to_find.size() <= max_members_in_discovery_request)
data.mbrs(members_to_find);
}


// message needs to have DONT_BUNDLE flag: if A sends message M to B, and we need to fetch B's physical // message needs to have DONT_BUNDLE flag: if A sends message M to B, and we need to fetch B's physical
// address, then the bundler thread blocks until the discovery request has returned. However, we cannot send // address, then the bundler thread blocks until the discovery request has returned. However, we cannot send
// the discovery *request* until the bundler thread has returned from sending M // the discovery *request* until the bundler thread has returned from sending M
Message msg=new Message(null).putHeader(getId(),hdr).setBuffer(marshal(data)) PingHeader hdr=new PingHeader(PingHeader.GET_MBRS_REQ).clusterName(cluster_name).initialDiscovery(initial_discovery);
Message msg=new Message(null).putHeader(getId(),hdr)
.setFlag(Message.Flag.INTERNAL,Message.Flag.DONT_BUNDLE,Message.Flag.OOB) .setFlag(Message.Flag.INTERNAL,Message.Flag.DONT_BUNDLE,Message.Flag.OOB)
.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK); .setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
if(data != null)
msg.setBuffer(marshal(data));
sendMcastDiscoveryRequest(msg); sendMcastDiscoveryRequest(msg);
} }


Expand Down
40 changes: 20 additions & 20 deletions src/org/jgroups/protocols/PingHeader.java
Expand Up @@ -11,41 +11,39 @@




/** /**
* Used to send discovery requests and responses
* @author Bela Ban * @author Bela Ban
*/ */
public class PingHeader extends Header { public class PingHeader extends Header {
public static final byte GET_MBRS_REQ=1; public static final byte GET_MBRS_REQ=1;
public static final byte GET_MBRS_RSP=2; public static final byte GET_MBRS_RSP=2;


protected byte type=0; protected byte type;
protected String cluster_name; protected String cluster_name;
protected boolean initial_discovery;




public PingHeader() { public PingHeader() {
} }


public PingHeader(byte type) {this.type=type;} public PingHeader(byte type) {this.type=type;}
public byte type() {return type;} public byte type() {return type;}
public PingHeader clusterName(String name) {this.cluster_name=name; return this;} public PingHeader clusterName(String name) {this.cluster_name=name; return this;}

public boolean initialDiscovery() {return initial_discovery;}
public short getMagicId() {return 53;} public PingHeader initialDiscovery(boolean b) {this.initial_discovery=b; return this;}
public short getMagicId() {return 53;}


public Supplier<? extends Header> create() {return PingHeader::new;} public Supplier<? extends Header> create() {return PingHeader::new;}


public int size() { public int size() {
int retval=Global.BYTE_SIZE *2; // type and cluster_name presence int retval=Global.BYTE_SIZE *3; // type, cluster_name presence and initial_discovery
if(cluster_name != null) if(cluster_name != null)
retval += cluster_name.length() +2; retval += cluster_name.length() +2;
return retval; return retval;
} }


public String toString() { public String toString() {
StringBuilder sb=new StringBuilder(); return String.format("[%s cluster=%s initial_discovery=%b]", type2Str(type), cluster_name, initial_discovery);
sb.append("[type=" + type2Str(type));
if(cluster_name != null)
sb.append(", cluster=").append(cluster_name);
sb.append(']');
return sb.toString();
} }


static String type2Str(byte t) { static String type2Str(byte t) {
Expand All @@ -57,13 +55,15 @@ static String type2Str(byte t) {
} }




public void writeTo(DataOutput outstream) throws Exception { public void writeTo(DataOutput out) throws Exception {
outstream.writeByte(type); out.writeByte(type);
Bits.writeString(cluster_name,outstream); Bits.writeString(cluster_name,out);
out.writeBoolean(initial_discovery);
} }


public void readFrom(DataInput instream) throws Exception { public void readFrom(DataInput in) throws Exception {
type=instream.readByte(); type=in.readByte();
cluster_name=Bits.readString(instream); cluster_name=Bits.readString(in);
initial_discovery=in.readBoolean();
} }
} }

0 comments on commit 3274627

Please sign in to comment.