Skip to content

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
...
  • 5 commits
  • 5 files changed
  • 0 commit comments
  • 1 contributor
View
23 src/org/jgroups/protocols/SHARED_LOOPBACK.java
@@ -6,6 +6,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
/**
@@ -17,12 +18,9 @@
private PhysicalAddress physical_addr=null;
/** Map of cluster names and address-protocol mappings. Used for routing messages to all or single members */
- private static final Map<String,Map<Address,SHARED_LOOPBACK>> routing_table=new ConcurrentHashMap<String,Map<Address,SHARED_LOOPBACK>>();
+ private static final ConcurrentMap<String,Map<Address,SHARED_LOOPBACK>> routing_table=new ConcurrentHashMap<String,Map<Address,SHARED_LOOPBACK>>();
- public SHARED_LOOPBACK() {
- }
-
public boolean supportsMulticasting() {
return false;
}
@@ -108,25 +106,28 @@ public Object down(Event evt) {
case Event.SET_LOCAL_ADDRESS:
local_addr=(Address)evt.getArg();
break;
-
- case Event.DISCONNECT:
- unregister(channel_name, local_addr);
- break;
}
return retval;
}
- private static void register(String channel_name, Address local_addr, SHARED_LOOPBACK shared_loopback) {
+ public void stop() {
+ super.stop();
+ // unregister(channel_name, local_addr);
+ }
+
+ protected static void register(String channel_name, Address local_addr, SHARED_LOOPBACK shared_loopback) {
Map<Address,SHARED_LOOPBACK> map=routing_table.get(channel_name);
if(map == null) {
map=new ConcurrentHashMap<Address,SHARED_LOOPBACK>();
- routing_table.put(channel_name, map);
+ Map<Address,SHARED_LOOPBACK> tmp=routing_table.putIfAbsent(channel_name,map);
+ if(tmp != null)
+ map=tmp;
}
map.put(local_addr, shared_loopback);
}
- private static void unregister(String channel_name, Address local_addr) {
+ protected static void unregister(String channel_name, Address local_addr) {
Map<Address,SHARED_LOOPBACK> map=routing_table.get(channel_name);
if(map != null) {
map.remove(local_addr);
View
9 src/org/jgroups/protocols/TP.java
@@ -589,6 +589,15 @@ public void setTimerThreadFactory(ThreadFactory factory) {
public TimeScheduler getTimer() {return timer;}
+ /**
+ * Sets a new timer. This should be done before the transport is initialized; be very careful, as replacing a
+ * running timer with tasks in it can wreak havoc !
+ * @param timer
+ */
+ public void setTimer(TimeScheduler timer) {
+ this.timer=timer;
+ }
+
public ThreadFactory getThreadFactory() {
return global_thread_factory;
}
View
7 src/org/jgroups/protocols/pbcast/GMS.java
@@ -93,6 +93,9 @@
@Property(description="Logs failures for collecting all view acks if true")
boolean log_collect_msgs=true;
+ @Property(description="Logs warnings for reception of views less than the current, and for views which don't include self")
+ boolean log_view_warnings=true;
+
/* --------------------------------------------- JMX ---------------------------------------------- */
@@ -572,7 +575,7 @@ public void installView(View new_view, Digest digest) {
if(view_id != null) {
rc=vid.compareTo(view_id);
if(rc <= 0) {
- if(log.isWarnEnabled() && rc < 0) // only scream if view is smaller, silently discard same views
+ if(log.isWarnEnabled() && rc < 0 && log_view_warnings) // only scream if view is smaller, silently discard same views
log.warn(local_addr + ": received view < current view;" +
" discarding it (current vid: " + view_id + ", new vid: " + vid + ')');
return;
@@ -600,7 +603,7 @@ public void installView(View new_view, Digest digest) {
/* Check for self-inclusion: if I'm not part of the new membership, I just discard it.
This ensures that messages sent in view V1 are only received by members of V1 */
if(checkSelfInclusion(mbrs) == false) {
- if(log.isWarnEnabled()) log.warn(local_addr + ": not member of view " + new_view + "; discarding it");
+ if(log.isWarnEnabled() && log_view_warnings) log.warn(local_addr + ": not member of view " + new_view + "; discarding it");
return;
}
View
13 src/org/jgroups/util/TimeScheduler2.java
@@ -3,7 +3,6 @@
import org.jgroups.Global;
-import org.jgroups.annotations.Experimental;
import org.jgroups.annotations.GuardedBy;
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
@@ -392,7 +391,7 @@ protected void stopRunner() {
private boolean completed=false; // set to true when the task has been executed
- public Entry(Runnable task) {
+ private Entry(Runnable task) {
last=this.task=new MyTask(task);
}
@@ -490,7 +489,7 @@ public String dump() {
protected volatile boolean done=false;
protected MyTask next;
- public MyTask(Runnable task) {
+ protected MyTask(Runnable task) {
this.task=task;
}
@@ -548,7 +547,7 @@ public String toString() {
protected volatile boolean cancelled=false;
- public RecurringTask(Runnable task) {
+ private RecurringTask(Runnable task) {
this.task=task;
}
@@ -633,7 +632,7 @@ public String toString() {
private class FixedIntervalTask<V> extends RecurringTask<V> {
final long interval;
- public FixedIntervalTask(Runnable task, long interval) {
+ private FixedIntervalTask(Runnable task, long interval) {
super(task);
this.interval=interval;
}
@@ -648,7 +647,7 @@ protected long nextInterval() {
final long first_execution;
int num_executions=0;
- public FixedRateTask(Runnable task, long interval) {
+ private FixedRateTask(Runnable task, long interval) {
super(task);
this.interval=interval;
this.first_execution=System.currentTimeMillis();
@@ -665,7 +664,7 @@ protected long nextInterval() {
private class DynamicIntervalTask<V> extends RecurringTask<V> {
- public DynamicIntervalTask(Task task) {
+ private DynamicIntervalTask(Task task) {
super(task);
}
View
124 tests/junit/org/jgroups/tests/LargeMergeTest.java
@@ -0,0 +1,124 @@
+package org.jgroups.tests;
+
+import org.jgroups.Global;
+import org.jgroups.JChannel;
+import org.jgroups.protocols.*;
+import org.jgroups.protocols.pbcast.GMS;
+import org.jgroups.protocols.pbcast.NAKACK;
+import org.jgroups.protocols.pbcast.STABLE;
+import org.jgroups.util.DefaultThreadFactory;
+import org.jgroups.util.TimeScheduler;
+import org.jgroups.util.TimeScheduler2;
+import org.jgroups.util.Util;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Tests merging with a large number of members
+ * @author Bela Ban
+ */
+@Test(groups=Global.FUNCTIONAL,sequential=true)
+public class LargeMergeTest {
+ static final int NUM=40; // number of members
+
+ protected final JChannel[] channels=new JChannel[NUM];
+
+
+ @BeforeMethod
+ void setUp() throws Exception {
+
+ ThreadGroup test_group=new ThreadGroup("LargeMergeTest");
+ TimeScheduler timer=new TimeScheduler2(new DefaultThreadFactory(test_group, "merge-", true, true),
+ 5,10,
+ 3000, 1000);
+ System.out.print("Connecting channels: ");
+ for(int i=0; i < NUM; i++) {
+ SHARED_LOOPBACK shared_loopback=(SHARED_LOOPBACK)new SHARED_LOOPBACK().setValue("enable_bundling", false);
+ shared_loopback.setValue("enable_diagnostics",false);
+ shared_loopback.setValue("timer_min_threads",1).setValue("timer_max_threads", 2);
+ shared_loopback.setTimer(timer);
+
+ channels[i]=Util.createChannel(shared_loopback,
+ new DISCARD().setValue("discard_all", true),
+ new PING().setValue("timeout", 100),
+ new MERGE2().setValue("min_interval", 2000).setValue("max_interval", 10000),
+ // new FD_ALL(),
+ new NAKACK().setValue("use_mcast_xmit", false)
+ .setValue("log_discard_msgs", false).setValue("log_not_found_msgs", false),
+ new UNICAST(),
+ new STABLE().setValue("max_bytes", 50000),
+ new GMS().setValue("print_local_addr", false)
+ .setValue("leave_timeout", 100)
+ .setValue("log_view_warnings", false));
+ channels[i].setName(String.valueOf((i + 1)));
+ channels[i].connect("LargeMergeTest");
+ System.out.print(i + 1 + " ");
+ }
+ System.out.println("");
+ }
+
+ @AfterMethod
+ void tearDown() throws Exception {
+ for(int i=NUM-1; i >= 0; i--)
+ Util.close(channels[i]);
+ }
+
+
+
+ public void testClusterFormationAfterMerge() {
+ System.out.println("\nEnabling message traffic between members to start the merge");
+ for(JChannel ch: channels) {
+ DISCARD discard=(DISCARD)ch.getProtocolStack().findProtocol(DISCARD.class);
+ discard.setDiscardAll(false);
+ }
+
+ boolean merge_completed=true;
+ for(int i=0; i < NUM; i++) {
+ merge_completed=true;
+ System.out.println();
+
+ Map<Integer,Integer> votes=new HashMap<Integer,Integer>();
+
+ for(JChannel ch: channels) {
+ int size=ch.getView().size();
+
+ Integer val=votes.get(size);
+ if(val == null)
+ votes.put(size, 1);
+ else
+ votes.put(size, val.intValue() +1);
+ if(size != NUM)
+ merge_completed=false;
+ }
+
+ if(i > 0) {
+ for(Map.Entry<Integer,Integer> entry: votes.entrySet()) {
+ System.out.println("==> " + entry.getValue() + " members have a view of " + entry.getKey());
+ }
+ }
+
+ if(merge_completed)
+ break;
+ Util.sleep(5000);
+ }
+
+ if(!merge_completed) {
+ System.out.println("\nFinal cluster:");
+ for(JChannel ch: channels) {
+ int size=ch.getView().size();
+ System.out.println(ch.getAddress() + ": " + size + " members - " + (size == NUM? "OK" : "FAIL"));
+ }
+ }
+ for(JChannel ch: channels) {
+ int size=ch.getView().size();
+ assert size == NUM : "Channel has " + size + " members, but should have " + NUM;
+ }
+ }
+
+
+}

No commit comments for this range

Something went wrong with that request. Please try again.