Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Implemented support for running MERGE3 over TCP and discovery protoco…

…ls TCPPING, MPING, TCPGOSSIP etc
  • Loading branch information...
commit 42e9fd6baee31022a76976bf005f0d1c95279199 1 parent 959f9f3
@belaban authored
Showing with 41 additions and 3 deletions.
  1. +41 −3 src/org/jgroups/protocols/MERGE3.java
View
44 src/org/jgroups/protocols/MERGE3.java
@@ -71,6 +71,10 @@
protected final ResponseCollector<View> view_rsps=new ResponseCollector<View>();
+ protected boolean transport_supports_multicasting=true;
+
+ protected String cluster_name;
+
@ManagedAttribute(description="Whether or not the current member is the coordinator")
@@ -121,6 +125,7 @@ public void init() throws Exception {
}
if(max_interval <= 0)
throw new Exception("max_interval must be > 0");
+ transport_supports_multicasting=getTransport().supportsMulticasting();
}
public void stop() {
@@ -193,6 +198,13 @@ protected synchronized void stopViewConsistencyChecker() {
public Object down(Event evt) {
switch(evt.getType()) {
+ case Event.CONNECT:
+ case Event.CONNECT_USE_FLUSH:
+ case Event.CONNECT_WITH_STATE_TRANSFER:
+ case Event.CONNECT_WITH_STATE_TRANSFER_USE_FLUSH:
+ cluster_name=(String)evt.getArg();
+ break;
+
case Event.DISCONNECT:
stopViewConsistencyChecker();
stopInfoSender();
@@ -307,9 +319,35 @@ public void run() {
String logical_name=UUID.get(local_addr);
ViewId view_id=view.getViewId();
MergeHeader hdr=MergeHeader.createInfo(view_id, logical_name, Arrays.asList(physical_addr));
- Message msg=new Message();
- msg.putHeader(getId(), hdr);
- down_prot.down(new Event(Event.MSG, msg));
+
+ if(transport_supports_multicasting) {
+ Message msg=new Message();
+ msg.putHeader(getId(), hdr);
+ down_prot.down(new Event(Event.MSG, msg));
+ return;
+ }
+
+ Discovery discovery_protocol=(Discovery)stack.findProtocol(Discovery.class);
+ if(discovery_protocol == null) {
+ log.warn("no discovery protocol found, cannot ask for physical addresses to send INFO message");
+ return;
+ }
+ Collection<PhysicalAddress> physical_addrs=discovery_protocol.fetchClusterMembers(cluster_name);
+ if(physical_addrs == null)
+ physical_addrs=(Collection<PhysicalAddress>)down_prot.down(new Event(Event.GET_PHYSICAL_ADDRESSES));
+
+ if(physical_addrs == null || physical_addrs.isEmpty()) {
+ log.trace("discovery protocol " + discovery_protocol.getName() + " returned 0 physical addresses");
+ return;
+ }
+ if(log.isTraceEnabled())
+ log.trace("discovery protocol " + discovery_protocol.getName() + " returned " + physical_addrs.size() +
+ " physical addresses: " + Util.printListWithDelimiter(physical_addrs, ", ", 10));
+ for(Address addr: physical_addrs) {
+ Message info=new Message(addr);
+ info.putHeader(getId(), hdr);
+ down_prot.down(new Event(Event.MSG, info));
+ }
}
public long nextInterval() {
Please sign in to comment.
Something went wrong with that request. Please try again.