Skip to content

Commit

Permalink
LOCAL_PING, new discovery protocol for members in the same process (h…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Aug 15, 2019
1 parent d9c34d6 commit ee17c84
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 58 deletions.
1 change: 1 addition & 0 deletions conf/jg-protocol-ids.xml
Expand Up @@ -69,6 +69,7 @@
<class id="65" name="org.jgroups.protocols.DH_KEY_EXCHANGE"/>
<class id="66" name="org.jgroups.protocols.MULTI_PING"/>
<class id="67" name="org.jgroups.protocols.CENTRAL_LOCK2"/>
<class id="68" name="org.jgroups.protocols.LOCAL_PING"/>

<!-- IDs reserved for building blocks -->
<class id="200" name="org.jgroups.blocks.RequestCorrelator"/> <!-- ID should be the same as Global.BLOCKS_START_ID -->
Expand Down
11 changes: 6 additions & 5 deletions src/org/jgroups/protocols/Discovery.java
Expand Up @@ -100,6 +100,7 @@ public abstract class Discovery extends Protocol {
protected volatile Address local_addr;
protected volatile Address current_coord;
protected String cluster_name;
protected TP transport;
protected final Map<Long,Responses> ping_responses=new HashMap<>();
protected final Set<Future<?>> discovery_req_futures=new ConcurrentSkipListSet<>();
@ManagedAttribute(description="Whether the transport supports multicasting")
Expand All @@ -113,19 +114,19 @@ public abstract class Discovery extends Protocol {


public void init() throws Exception {
TP tp=getTransport();
transport=getTransport();
if(stagger_timeout < 0)
throw new IllegalArgumentException("stagger_timeout cannot be negative");
if(num_discovery_runs < 1)
throw new IllegalArgumentException("num_discovery_runs must be >= 1");
transport_supports_multicasting=tp.supportsMulticasting();
sends_can_block=getTransport() instanceof TCP; // UDP and TCP_NIO2 won't block
use_ip_addrs=tp.getUseIpAddresses();
transport_supports_multicasting=transport.supportsMulticasting();
sends_can_block=transport instanceof TCP; // UDP and TCP_NIO2 won't block
use_ip_addrs=transport.getUseIpAddresses();
}

public void start() throws Exception {
super.start();
timer=getTransport().getTimer();
timer=transport.getTimer();
if(timer == null)
throw new Exception("timer cannot be retrieved from protocol stack");
}
Expand Down
107 changes: 107 additions & 0 deletions src/org/jgroups/protocols/LOCAL_PING.java
@@ -0,0 +1,107 @@
package org.jgroups.protocols;

import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.PhysicalAddress;
import org.jgroups.util.NameCache;
import org.jgroups.util.Responses;
import org.jgroups.util.Tuple;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;


/**
* Discovery protocol for finding members in the local process only. Doesn't send discovery requests and responses, but
* fetches discovery information directly from a hashmap. Used mainly by unit test.
* @author Bela Ban
* @since 4.1.3
*/
public class LOCAL_PING extends Discovery {
/** Map of cluster names and address-protocol mappings. Used for routing messages to all or single members */
protected static final Map<String,List<PingData>> discovery=new ConcurrentHashMap<>();
protected static final Function<String,List<PingData>> func=k -> new ArrayList<>();

public boolean isDynamic() {
return true;
}

public void stop() {
super.stop();
}

@Override
public void findMembers(List<Address> members, boolean initial_discovery, Responses responses) {
num_discovery_requests++;
List<PingData> list=discovery.get(cluster_name);
if(list != null && !list.isEmpty()) {
synchronized(this) {
if(list.stream().noneMatch(PingData::isCoord))
list.get(0).coord(true);
list.stream().filter(el -> !el.sender.equals(local_addr))
.forEach(d -> {
addAddressToLocalCache(d.sender, d.physical_addr);
responses.addResponse(d, false);
});
}
}
responses.done(); // so waitFor() doesn't block at all
}

public Object down(Event evt) {
if(evt.type() == Event.VIEW_CHANGE && cluster_name != null) {
// synchronize TP.logical_addr_cache with discovery cache
List<PingData> data=discovery.get(cluster_name);
if(data != null && !data.isEmpty()) {
for(PingData d: data) {
Address sender=d.getAddress();
if(down_prot.down(new Event(Event.GET_PHYSICAL_ADDRESS, sender)) == null)
down_prot.down(new Event(Event.ADD_PHYSICAL_ADDRESS, new Tuple<>(sender, d.getPhysicalAddr())));
}
}
}
return super.down(evt);
}

public void handleConnect() {
if(cluster_name == null || local_addr == null)
throw new IllegalStateException("cluster name and local address cannot be null");
String logical_name=NameCache.get(local_addr);
PhysicalAddress physical_addr=(PhysicalAddress)down_prot.down(new Event(Event.GET_PHYSICAL_ADDRESS, local_addr));
if(physical_addr != null) {
PingData data=new PingData(local_addr, is_server, logical_name, physical_addr);
final List<PingData> list=discovery.computeIfAbsent(cluster_name, func);
synchronized(this) {
if(list.isEmpty())
data.coord(true); // first element is coordinator
list.add(data);
}
}
}

public void handleDisconnect() {
if(local_addr == null || cluster_name == null)
return;
List<PingData> list=discovery.get(cluster_name);
if(list != null) {
synchronized(this) {
list.removeIf(p -> local_addr.equals(p.getAddress()));
if(list.isEmpty())
discovery.remove(cluster_name);
}
}
}

public String toString() {
return String.format("%s(%s)", LOCAL_PING.class.getSimpleName(), local_addr);
}

protected void addAddressToLocalCache(Address addr, PhysicalAddress phys_addr) {
Tuple<Address,PhysicalAddress> tuple=new Tuple(addr, phys_addr);
down_prot.down(new Event(Event.ADD_PHYSICAL_ADDRESS, tuple));
}

}
8 changes: 3 additions & 5 deletions tests/junit-functional/org/jgroups/tests/MergeTest2.java
Expand Up @@ -50,8 +50,7 @@ void setUp() throws Exception {


protected JChannel createChannel(String name) throws Exception {
SHARED_LOOPBACK shared_loopback=new SHARED_LOOPBACK();
shared_loopback.setDiagnosticsHandler(handler);
SHARED_LOOPBACK shared_loopback=new SHARED_LOOPBACK().setDiagnosticsHandler(handler);

JChannel retval=new JChannel(shared_loopback,
new DISCARD().setValue("discard_all",true),
Expand Down Expand Up @@ -90,8 +89,7 @@ void tearDown() throws Exception {

public void testMergeWithMissingMergeResponse() {
JChannel merge_leader=findMergeLeader(a,b,c,d);
List<Address> non_faulty_members=new ArrayList<>();
non_faulty_members.addAll(Arrays.asList(a.getAddress(),b.getAddress(),c.getAddress(),d.getAddress()));
List<Address> non_faulty_members=new ArrayList<>(Arrays.asList(a.getAddress(), b.getAddress(), c.getAddress(), d.getAddress()));
List<Address> tmp=new ArrayList<>(non_faulty_members);
tmp.remove(merge_leader.getAddress());
Address faulty_member=Util.pickRandomElement(tmp);
Expand Down Expand Up @@ -143,7 +141,7 @@ public void testMergeWithMissingMergeResponse() {
}
}

protected JChannel findMergeLeader(JChannel ... channels) {
protected static JChannel findMergeLeader(JChannel... channels) {
Set<Address> tmp=new TreeSet<>();
for(JChannel ch: channels)
tmp.add(ch.getAddress());
Expand Down
28 changes: 14 additions & 14 deletions tests/junit-functional/org/jgroups/tests/MergeTest3.java
Expand Up @@ -82,9 +82,9 @@ public void testMergeWithMissingMergeResponse() {
setMergeIdIn(busy_second, busy_merge_id);
for(JChannel ch: new JChannel[]{a,b,c,d,e,f}) { // excluding faulty member, as it still discards messages
assert ch.getView().size() == 3;
GMS gms=(GMS)ch.getProtocolStack().findProtocol(GMS.class);
GMS gms=ch.getProtocolStack().findProtocol(GMS.class);
gms.setJoinTimeout(3000);
DISCARD discard=(DISCARD)ch.getProtocolStack().findProtocol(DISCARD.class);
DISCARD discard=ch.getProtocolStack().findProtocol(DISCARD.class);
discard.setDiscardAll(false);
}

Expand All @@ -93,7 +93,7 @@ public void testMergeWithMissingMergeResponse() {
merge_views.put(first_coord, findChannel(first_coord).getView());
merge_views.put(second_coord, findChannel(second_coord).getView());

GMS gms=(GMS)merge_leader.getProtocolStack().findProtocol(GMS.class);
GMS gms=merge_leader.getProtocolStack().findProtocol(GMS.class);
gms.up(new Event(Event.MERGE, merge_views));

for(int i=0; i < 20; i++) {
Expand Down Expand Up @@ -137,7 +137,7 @@ public void testMergeWithMissingMergeResponse() {

System.out.println("merge event is " + merge_views);

gms=(GMS)merge_leader.getProtocolStack().findProtocol(GMS.class);
gms=merge_leader.getProtocolStack().findProtocol(GMS.class);
gms.up(new Event(Event.MERGE, merge_views));

for(int i=0; i < 20; i++) {
Expand All @@ -154,12 +154,12 @@ public void testMergeWithMissingMergeResponse() {
Util.sleep(1000);
}
for(JChannel ch: new JChannel[]{a,b,c,d,e,f}) {
if(ch.getAddress().equals(busy_first) || ch.getAddress().equals(busy_second))
assert ch.getView().size() == 6 : ch.getAddress() + "'s view: " + ch.getView();
assert !ch.getAddress().equals(busy_first) && !ch.getAddress().equals(busy_second)
|| ch.getView().size() == 6 : ch.getAddress() + "'s view: " + ch.getView();
}
}

protected JChannel createChannel(String name) throws Exception {
protected static JChannel createChannel(String name) throws Exception {
JChannel retval=new JChannel(new SHARED_LOOPBACK(),
new DISCARD().setValue("discard_all",true),
new SHARED_LOOPBACK_PING(),
Expand All @@ -181,12 +181,12 @@ protected JChannel createChannel(String name) throws Exception {
}

protected void setMergeIdIn(Address mbr, MergeId busy_merge_id) {
GMS gms=(GMS)findChannel(mbr).getProtocolStack().findProtocol(GMS.class);
GMS gms=findChannel(mbr).getProtocolStack().findProtocol(GMS.class);
gms.getMerger().setMergeId(null, busy_merge_id);
}

protected void cancelMerge(Address mbr) {
GMS gms=(GMS)findChannel(mbr).getProtocolStack().findProtocol(GMS.class);
GMS gms=findChannel(mbr).getProtocolStack().findProtocol(GMS.class);
gms.cancelMerge();
}

Expand All @@ -210,31 +210,31 @@ protected void createPartition(JChannel ... channels) {
View view=new View(coord, view_id, members);
MutableDigest digest=new MutableDigest(view.getMembersRaw());
for(JChannel ch: channels) {
NAKACK2 nakack=(NAKACK2)ch.getProtocolStack().findProtocol(NAKACK2.class);
NAKACK2 nakack=ch.getProtocolStack().findProtocol(NAKACK2.class);
digest.merge(nakack.getDigest(ch.getAddress()));
}
for(JChannel ch: channels) {
GMS gms=(GMS)ch.getProtocolStack().findProtocol(GMS.class);
GMS gms=ch.getProtocolStack().findProtocol(GMS.class);
gms.installView(view, digest);
}
}

protected List<Address> getMembers(JChannel ... channels) {
protected static List<Address> getMembers(JChannel... channels) {
List<Address> members=new ArrayList<>(channels.length);
for(JChannel ch: channels)
members.add(ch.getAddress());
return members;
}

protected Address determineCoordinator(JChannel ... channels) {
protected static Address determineCoordinator(JChannel... channels) {
List<Address> list=new ArrayList<>(channels.length);
for(JChannel ch: channels)
list.add(ch.getAddress());
Collections.sort(list);
return list.get(0);
}

protected JChannel findMergeLeader(JChannel ... channels) {
protected static JChannel findMergeLeader(JChannel... channels) {
Set<Address> tmp=new TreeSet<>();
for(JChannel ch: channels)
tmp.add(ch.getAddress());
Expand Down

0 comments on commit ee17c84

Please sign in to comment.