Skip to content

Commit

Permalink
- Multiple ViewIds from the same member are collapsed, and only the l…
Browse files Browse the repository at this point in the history
…atest ViewId is used. This prevents long subviews in a MergeView (https://issues.jboss.org/browse/JGRP-1876)
  • Loading branch information
belaban committed Feb 20, 2015
1 parent 20cb3e8 commit 62ed194
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 56 deletions.
60 changes: 39 additions & 21 deletions src/org/jgroups/protocols/MERGE3.java
Expand Up @@ -13,8 +13,6 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -66,8 +64,8 @@ public class MERGE3 extends Protocol {

protected Future<?> view_consistency_checker;

// hashmap to keep track of view-id sent in INFO messages
protected final ConcurrentMap<ViewId,Set<Address>> views=new ConcurrentHashMap<>(view != null? view.size() : 16);
// hashmap to keep track of view-id sent in INFO messages. Keys=senders, values = ViewId sent
protected final Map<Address,ViewId> views=new HashMap<>();

protected final ResponseCollector<View> view_rsps=new ResponseCollector<>();

Expand Down Expand Up @@ -99,14 +97,14 @@ public synchronized boolean isInfoSenderRunning() {
@ManagedOperation(description="Lists the contents of the cached views")
public String dumpViews() {
StringBuilder sb=new StringBuilder();
for(Map.Entry<ViewId,Set<Address>> entry: views.entrySet())
for(Map.Entry<ViewId,Set<Address>> entry: convertViews().entrySet())
sb.append(entry.getKey()).append(": [")
.append(Util.printListWithDelimiter(entry.getValue(), ", ", Util.MAX_LIST_PRINT_SIZE)).append("]\n");
return sb.toString();
}

@ManagedOperation(description="Clears the views cache")
public void clearViews() {views.clear();}
public void clearViews() {synchronized(views) {views.clear();}}


@ManagedOperation(description="Send INFO")
Expand Down Expand Up @@ -327,20 +325,39 @@ protected void addInfo(Address sender, ViewId view_id, String logical_name, Phys
UUID.add(sender, logical_name);
if(physical_addr != null)
down(new Event(Event.SET_PHYSICAL_ADDRESS, new Tuple<>(sender, physical_addr)));
Set<Address> existing=views.get(view_id);
if(existing == null) {
existing=new ConcurrentSkipListSet<>();
Set<Address> tmp=views.putIfAbsent(view_id, existing);
if(tmp != null)
existing=tmp;
synchronized(views) {
ViewId existing=views.get(sender);
if(existing == null || existing.compareTo(view_id) < 0)
views.put(sender, view_id);
}
if(sender != null)
existing.add(sender);
}

// remove sender from all other sets (old info)
for(Set<Address> set: views.values())
if(set != existing)
set.remove(sender);
protected Map<ViewId,Set<Address>> convertViews() {
Map<ViewId,Set<Address>> retval=new HashMap<>();
synchronized(views) {
for(Map.Entry<Address,ViewId> entry : views.entrySet()) {
Address key=entry.getKey();
ViewId view_id=entry.getValue();
Set<Address> existing=retval.get(view_id);
if(existing == null)
retval.put(view_id, existing=new ConcurrentSkipListSet<>());
existing.add(key);
}
}
return retval;
}

protected boolean differentViewIds() {
ViewId first=null;
synchronized(views) {
for(ViewId view_id : views.values()) {
if(first == null)
first=view_id;
else if(!first.equals(view_id))
return true;
}
}
return false;
}

protected class InfoSender implements TimeScheduler.Task {
Expand Down Expand Up @@ -395,7 +412,7 @@ public void run() {
try {
MergeHeader hdr=createInfo();
addInfo(local_addr, hdr.view_id, hdr.logical_name, hdr.physical_addr);
if(views.size() <= 1) {
if(!differentViewIds()) {
log.trace("%s: found no inconsistent views: %s", local_addr, dumpViews());
return;
}
Expand All @@ -412,7 +429,8 @@ protected void _run() {
// Only add view creators which *are* actually in the set as well, e.g.
// A|4: {A,B,C} and
// B|4: {D} would only add A to the coords list. A is a real coordinator
for(Map.Entry<ViewId,Set<Address>> entry: views.entrySet()) {
Map<ViewId,Set<Address>> converted_views=convertViews();
for(Map.Entry<ViewId,Set<Address>> entry: converted_views.entrySet()) {
Address coord=entry.getKey().getCreator();
Set<Address> members=entry.getValue();
if(members != null && members.contains(coord))
Expand All @@ -428,7 +446,7 @@ protected void _run() {
log.debug("I (%s) will be the merge leader", local_addr);

// add merge participants
for(Set<Address> set: views.values()) {
for(Set<Address> set: converted_views.values()) {
if(!set.isEmpty())
coords.add(set.iterator().next());
}
Expand Down
1 change: 1 addition & 0 deletions src/org/jgroups/protocols/pbcast/GMS.java
Expand Up @@ -177,6 +177,7 @@ public GMS() {
}

public ViewId getViewId() {return view != null? view.getViewId() : null;}
public View view() {return view;}

/** Returns the current view and digest. Try to find a matching digest twice (if not found on the first try) */
public Tuple<View,Digest> getViewAndDigest() {
Expand Down
148 changes: 113 additions & 35 deletions tests/junit-functional/org/jgroups/tests/MergeTest4.java
@@ -1,12 +1,12 @@
package org.jgroups.tests;

import org.jgroups.*;
import org.jgroups.protocols.SHARED_LOOPBACK;
import org.jgroups.protocols.SHARED_LOOPBACK_PING;
import org.jgroups.protocols.UNICAST3;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.*;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.STABLE;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.MutableDigest;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -76,7 +76,7 @@ public void testMergeWithAsymetricViewsCoordIsolated() {
gms=(GMS)merge_leader.getProtocolStack().findProtocol(GMS.class);
gms.up(new Event(Event.MERGE, merge_views));

Util.waitUntilAllChannelsHaveSameSize(10000000, 1000, a, b, c, d);
Util.waitUntilAllChannelsHaveSameSize(10000, 1000, a, b, c, d);

System.out.println("Views are:");
for(JChannel ch: Arrays.asList(a,b,c,d)) {
Expand All @@ -94,29 +94,92 @@ public void testMergeWithAsymetricViewsCoordIsolated() {
}

/**
* Tests a merge between ViewIds of the same coord, e.g. A|3, A|4, A|5
* Tests a merge between ViewIds of the same coord, e.g. A|6, A|7, A|8, A|9
*/
public void testViewsBySameCoord() {
Map<Address,View> merge_views=new HashMap<>(4);
View v1=View.create(a.getAddress(), 3, a.getAddress(),b.getAddress(),c.getAddress(),d.getAddress()); // {A,B,C,D}
View v2=View.create(a.getAddress(), 4, a.getAddress(),b.getAddress(),c.getAddress()); // {A,B,C}
View v3=View.create(a.getAddress(), 5, a.getAddress(),b.getAddress()); // {A,B}
View v4=View.create(a.getAddress(), 6, a.getAddress()); // {A}
merge_views.put(a.getAddress(), v1);
merge_views.put(b.getAddress(), v2);
merge_views.put(c.getAddress(), v3);
merge_views.put(d.getAddress(), v4);

Util.close(b,c,d);

GMS gms=(GMS)a.getProtocolStack().findProtocol(GMS.class);
gms.up(new Event(Event.MERGE, merge_views));
View v1=View.create(a.getAddress(), 6, a.getAddress(),b.getAddress(),c.getAddress(),d.getAddress()); // {A,B,C,D}
View v2=View.create(a.getAddress(), 7, a.getAddress(),b.getAddress(),c.getAddress()); // {A,B,C}
View v3=View.create(a.getAddress(), 8, a.getAddress(),b.getAddress()); // {A,B}
View v4=View.create(a.getAddress(), 9, a.getAddress()); // {A}

Util.close(b,c,d); // not interested in those...

MERGE3 merge=(MERGE3)a.getProtocolStack().findProtocol(MERGE3.class);
for(View v: Arrays.asList(v1,v2,v4,v3)) {
MERGE3.MergeHeader hdr=MERGE3.MergeHeader.createInfo(v.getViewId(), null, null);
Message msg=new Message(null, a.getAddress(), null).putHeader(merge.getId(), hdr);
merge.up(new Event(Event.MSG, msg));
}

merge.checkInconsistencies(); // no merge will happen

Util.waitUntilAllChannelsHaveSameSize(10000, 500, a);
System.out.println("A's view: " + a.getView());
assert a.getView().size() == 1;
assert a.getView().containsMember(a.getAddress());
}


/**
* Tests A|6, A|7, A|8 and B|7, B|8, B|9 -> we should have a subviews in MergeView consisting of
* only 2 elements: A|5 and B|5
*/
public void testMultipleViewsBySameMembers() throws Exception {
View a1=View.create(a.getAddress(), 6, a.getAddress(),b.getAddress(),c.getAddress(),d.getAddress()); // {A,B,C,D}
View a2=View.create(a.getAddress(), 7, a.getAddress(),b.getAddress(),c.getAddress()); // {A,B,C}
View a3=View.create(a.getAddress(), 8, a.getAddress(),b.getAddress()); // {A,B}
View a4=View.create(a.getAddress(), 9, a.getAddress()); // {A}

View b1=View.create(b.getAddress(), 7, b.getAddress(), c.getAddress(), d.getAddress());
View b2=View.create(b.getAddress(), 8, b.getAddress(), c.getAddress());
View b3=View.create(b.getAddress(), 9, b.getAddress());

Util.close(c,d); // not interested in those...

// A and B cannot communicate:
discard(true, a,b);

// inject view A|6={A} into A and B|5={B} into B
injectView(a4, a);
injectView(b3, b);

assert a.getView().equals(a4);
assert b.getView().equals(b3);

List<Event> merge_events=new ArrayList<>();
short merge_id=ClassConfigurator.getProtocolId(MERGE3.class);

for(View v: Arrays.asList(a3,a4,a2,a1)) {
MERGE3.MergeHeader hdr=MERGE3.MergeHeader.createInfo(v.getViewId(), null, null);
Message msg=new Message(null, a.getAddress(), null).putHeader(merge_id, hdr);
merge_events.add(new Event(Event.MSG, msg));
}
for(View v: Arrays.asList(b2,b3,b1)) {
MERGE3.MergeHeader hdr=MERGE3.MergeHeader.createInfo(v.getViewId(), null, null);
Message msg=new Message(null, b.getAddress(), null).putHeader(merge_id, hdr);
merge_events.add(new Event(Event.MSG, msg));
}

// A and B can communicate again
discard(false, a,b);

injectMergeEvents(merge_events, a,b);
checkInconsistencies(a,b); // merge will happen between A and B
Util.waitUntilAllChannelsHaveSameSize(10000, 500, a,b);
System.out.println("A's view: " + a.getView() + "\nB's view: " + b.getView());
assert a.getView().size() == 2;
assert a.getView().containsMember(a.getAddress());
assert a.getView().containsMember(b.getAddress());
assert a.getView().equals(b.getView());
for(View merge_view: Arrays.asList(getViewFromGMS(a), getViewFromGMS(b))) {
System.out.println(merge_view);
assert merge_view instanceof MergeView;
List<View> subviews=((MergeView)merge_view).getSubgroups();
assert subviews.size() == 2;
}
}


protected static boolean contains(View view, Address ... members) {
List<Address> mbrs=view.getMembers();
for(Address member: members)
Expand All @@ -139,6 +202,7 @@ protected static class MyReceiver extends ReceiverAdapter {
protected JChannel createChannel(String name) throws Exception {
JChannel retval=new JChannel(new SHARED_LOOPBACK(),
new SHARED_LOOPBACK_PING(),
new MERGE3().setValue("min_interval", 300000).setValue("max_interval", 600000),
new NAKACK2().setValue("use_mcast_xmit",false)
.setValue("log_discard_msgs",false).setValue("log_not_found_msgs",false),
new UNICAST3(),
Expand All @@ -163,28 +227,42 @@ protected JChannel findChannel(Address mbr) {
return null;
}

protected void createPartition(JChannel ... channels) {
long view_id=1; // find the highest view-id +1
for(JChannel ch: channels)
view_id=Math.max(ch.getView().getViewId().getId(), view_id);
view_id++;

List<Address> members=getMembers(channels);
Collections.sort(members);
Address coord=members.get(0);
View view=new View(coord, view_id, members);
MutableDigest digest=new MutableDigest(view.getMembersRaw());

protected void injectMergeEvents(List<Event> events, JChannel ... channels) {
for(JChannel ch: channels) {
NAKACK2 nakack=(NAKACK2)ch.getProtocolStack().findProtocol(NAKACK2.class);
digest.merge(nakack.getDigest(ch.getAddress()));
MERGE3 merge=(MERGE3)ch.getProtocolStack().findProtocol(MERGE3.class);
for(Event evt: events)
merge.up(evt);
}
}

protected void discard(boolean flag, JChannel ... channels) throws Exception {
for(JChannel ch: channels) {
GMS gms=(GMS)ch.getProtocolStack().findProtocol(GMS.class);
System.out.println("Injecting view " + view + " into " + ch.getAddress());
gms.installView(view, digest);
ProtocolStack stack=ch.getProtocolStack();
DISCARD discard=(DISCARD)stack.findProtocol(DISCARD.class);
if(discard == null)
stack.insertProtocol(discard=new DISCARD(), ProtocolStack.ABOVE, stack.getTransport().getClass());
discard.setDiscardAll(flag);
}
}

protected void injectView(View view, JChannel ch) {
GMS gms=(GMS)ch.getProtocolStack().findProtocol(GMS.class);
gms.installView(view);
}

protected void checkInconsistencies(JChannel ... channels) {
for(JChannel ch: channels) {
MERGE3 merge=(MERGE3)ch.getProtocolStack().findProtocol(MERGE3.class);
merge.checkInconsistencies();
}
}

protected View getViewFromGMS(JChannel ch) {
GMS gms=(GMS)ch.getProtocolStack().findProtocol(GMS.class);
return gms.view();
}

protected List<Address> getMembers(JChannel ... channels) {
List<Address> members=new ArrayList<>(channels.length);
for(JChannel ch: channels)
Expand Down

0 comments on commit 62ed194

Please sign in to comment.