Skip to content

Commit

Permalink
Use View.getCoord() to determine coordinator (https://issues.jboss.or…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jun 7, 2017
1 parent c6cd765 commit 2c0ee9b
Show file tree
Hide file tree
Showing 15 changed files with 21 additions and 36 deletions.
2 changes: 1 addition & 1 deletion conf/JGROUPS_VERSION.properties
@@ -1,5 +1,5 @@


## Defines the JGroups version ## Defines the JGroups version
## This file should be the *only place* to change when changing the version number ## This file should be the *only place* to change when changing the version number
jgroups.version=4.0.3.Final jgroups.version=4.0.4-SNAPSHOT
jgroups.codename=Schiener Berg jgroups.codename=Schiener Berg
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -5,7 +5,7 @@
<artifactId>jgroups</artifactId> <artifactId>jgroups</artifactId>
<packaging>bundle</packaging> <packaging>bundle</packaging>
<name>JGroups</name> <name>JGroups</name>
<version>4.0.3.Final</version> <version>4.0.4-SNAPSHOT</version>
<url>http://www.jgroups.org</url> <url>http://www.jgroups.org</url>


<properties> <properties>
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/demos/ReplicatedHashMapDemo.java
Expand Up @@ -218,7 +218,7 @@ public void viewChange(View view, java.util.List<Address> new_mbrs, java.util.Li
} }


private void _setTitle() { private void _setTitle() {
int num=map.getChannel().getView().getMembers().size(); int num=map.getChannel().getView().size();
setTitle("ReplicatedHashMapDemo: " + num + " server(s)"); setTitle("ReplicatedHashMapDemo: " + num + " server(s)");
} }


Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/CENTRAL_EXECUTOR.java
Expand Up @@ -65,7 +65,7 @@ public String getBackups() {
public void handleView(View view) { public void handleView(View view) {
Address oldCoord = coord; Address oldCoord = coord;
if(view.size() > 0) { if(view.size() > 0) {
coord=view.getMembers().iterator().next(); coord=view.getCoord();
is_coord=coord.equals(local_addr); is_coord=coord.equals(local_addr);
if(log.isDebugEnabled()) if(log.isDebugEnabled())
log.debug("local_addr=" + local_addr + ", coord=" + coord + ", is_coord=" + is_coord); log.debug("local_addr=" + local_addr + ", coord=" + coord + ", is_coord=" + is_coord);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/CENTRAL_LOCK.java
Expand Up @@ -120,7 +120,7 @@ public void handleView(View view) {
super.handleView(view); super.handleView(view);
Address old_coord=coord; Address old_coord=coord;
if(view.size() > 0) { if(view.size() > 0) {
coord=view.getMembers().iterator().next(); coord=view.getCoord();
is_coord=coord.equals(local_addr); is_coord=coord.equals(local_addr);
if(log.isDebugEnabled()) if(log.isDebugEnabled())
log.debug("local_addr=" + local_addr + ", coord=" + coord + ", is_coord=" + is_coord); log.debug("local_addr=" + local_addr + ", coord=" + coord + ", is_coord=" + is_coord);
Expand Down
3 changes: 1 addition & 2 deletions src/org/jgroups/protocols/FORWARD_TO_COORD.java
Expand Up @@ -6,7 +6,6 @@
import org.jgroups.stack.Protocol; import org.jgroups.stack.Protocol;
import org.jgroups.util.Bits; import org.jgroups.util.Bits;
import org.jgroups.util.ForwardQueue; import org.jgroups.util.ForwardQueue;
import org.jgroups.util.Util;


import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
Expand Down Expand Up @@ -156,7 +155,7 @@ public Object up(Message msg) {




protected void handleViewChange(View view) { protected void handleViewChange(View view) {
Address new_coord=Util.getCoordinator(view); Address new_coord=view.getCoord();
boolean coord_changed=!Objects.equals(coord, new_coord); boolean coord_changed=!Objects.equals(coord, new_coord);


if(coord_changed || received_not_coord) { if(coord_changed || received_not_coord) {
Expand Down
3 changes: 1 addition & 2 deletions src/org/jgroups/protocols/MERGE3.java
Expand Up @@ -248,8 +248,7 @@ public Object down(Event evt) {
if(only_coords_run_consistency_checker == false) if(only_coords_run_consistency_checker == false)
startViewConsistencyChecker(); startViewConsistencyChecker();


List<Address> mbrs=view.getMembers(); Address coord=view.getCoord();
Address coord=mbrs.isEmpty()? null : mbrs.get(0);
if(Objects.equals(coord, local_addr)) { if(Objects.equals(coord, local_addr)) {
is_coord=true; is_coord=true;
if(only_coords_run_consistency_checker) if(only_coords_run_consistency_checker)
Expand Down
5 changes: 2 additions & 3 deletions src/org/jgroups/protocols/RELAY.java
Expand Up @@ -15,7 +15,6 @@
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors;


/** /**
* Simple relaying protocol: RELAY is added to the top of the stack, creates a channel to a bridge cluster, * Simple relaying protocol: RELAY is added to the top of the stack, creates a channel to a bridge cluster,
Expand Down Expand Up @@ -541,7 +540,7 @@ public void receive(Message msg) {
msg.getOffset(), msg.getLength()); msg.getOffset(), msg.getLength());
// replace addrs with proxies // replace addrs with proxies
if(data.remote_view != null) { if(data.remote_view != null) {
List<Address> mbrs=data.remote_view.getMembers().stream().collect(Collectors.toCollection(LinkedList::new)); List<Address> mbrs=new LinkedList<>(data.remote_view.getMembers());
data.remote_view=new View(data.remote_view.getViewId(), mbrs); data.remote_view=new View(data.remote_view.getViewId(), mbrs);
} }
boolean merge=remote_view == null; boolean merge=remote_view == null;
Expand Down Expand Up @@ -571,7 +570,7 @@ public void viewAccepted(View view) {
switch(view.size()) { switch(view.size()) {
case 1: case 1:
// the remote cluster disappeared, remove all of its addresses from the view // the remote cluster disappeared, remove all of its addresses from the view
if(prev_members > 1 && view.getMembers().iterator().next().equals(bridge.getAddress())) { if(prev_members > 1 && view.getCoord().equals(bridge.getAddress())) {
remote_view=null; remote_view=null;
View new_global_view=generateGlobalView(local_view, null); View new_global_view=generateGlobalView(local_view, null);
sendViewOnLocalCluster(null, new_global_view, false, null); sendViewOnLocalCluster(null, new_global_view, false, null);
Expand Down
7 changes: 2 additions & 5 deletions src/org/jgroups/protocols/SEQUENCER.java
Expand Up @@ -294,11 +294,8 @@ protected void flush(final Address new_coord) throws InterruptedException {
// If we're becoming coordinator, we need to handle TMP_VIEW as // If we're becoming coordinator, we need to handle TMP_VIEW as
// an immediate change of view. See JGRP-1452. // an immediate change of view. See JGRP-1452.
private void handleTmpView(View v) { private void handleTmpView(View v) {
List<Address> mbrs=v.getMembers(); Address new_coord=v.getCoord();
if(mbrs.isEmpty()) return; if(new_coord != null && !new_coord.equals(coord) && local_addr != null && local_addr.equals(new_coord))

Address new_coord=mbrs.get(0);
if(!new_coord.equals(coord) && local_addr != null && local_addr.equals(new_coord))
handleViewChange(v); handleViewChange(v);
} }


Expand Down
7 changes: 2 additions & 5 deletions src/org/jgroups/protocols/SEQUENCER2.java
Expand Up @@ -307,11 +307,8 @@ protected void handleViewChange(View v) {
// If we're becoming coordinator, we need to handle TMP_VIEW as // If we're becoming coordinator, we need to handle TMP_VIEW as
// an immediate change of view. See JGRP-1452. // an immediate change of view. See JGRP-1452.
private void handleTmpView(View v) { private void handleTmpView(View v) {
List<Address> mbrs=v.getMembers(); Address new_coord=v.getCoord();
if(mbrs.isEmpty()) return; if(new_coord != null && !new_coord.equals(coord) && local_addr != null && local_addr.equals(new_coord))

Address new_coord=mbrs.get(0);
if(!new_coord.equals(coord) && local_addr != null && local_addr.equals(new_coord))
handleViewChange(v); handleViewChange(v);
} }


Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/pbcast/STABLE.java
Expand Up @@ -361,7 +361,7 @@ protected void handleViewChange(View v) {
lock.lock(); lock.lock();
try { try {
this.view=v; this.view=v;
coordinator=v.getMembers().get(0); coordinator=v.getCoord();
resetDigest(); resetDigest();
if(!initialized) if(!initialized)
initialized=true; initialized=true;
Expand Down
6 changes: 3 additions & 3 deletions src/org/jgroups/protocols/relay/RELAY2.java
Expand Up @@ -725,9 +725,9 @@ protected List<Address> determineSiteMasters(View view) {
} }


if(retval.isEmpty()) { if(retval.isEmpty()) {
Address tmp=Util.getCoordinator(view); Address coord=view.getCoord();
if(tmp != null) if(coord != null)
retval.add(Util.getCoordinator(view)); retval.add(coord);
} }
return retval; return retval;
} }
Expand Down
10 changes: 2 additions & 8 deletions src/org/jgroups/util/Util.java
Expand Up @@ -4109,16 +4109,10 @@ public static boolean isCoordinator(JChannel ch) {
public static boolean isCoordinator(View view,Address local_addr) { public static boolean isCoordinator(View view,Address local_addr) {
if(view == null || local_addr == null) if(view == null || local_addr == null)
return false; return false;
List<Address> mbrs=view.getMembers(); Address coord=view.getCoord();
return !(mbrs == null || mbrs.isEmpty()) && local_addr.equals(mbrs.iterator().next()); return coord != null && local_addr.equals(coord);
} }


public static Address getCoordinator(View view) {
if(view == null)
return null;
List<Address> mbrs=view.getMembers();
return !mbrs.isEmpty()? mbrs.get(0) : null;
}


public static MBeanServer getMBeanServer() { public static MBeanServer getMBeanServer() {
List<MBeanServer> servers=MBeanServerFactory.findMBeanServer(null); List<MBeanServer> servers=MBeanServerFactory.findMBeanServer(null);
Expand Down
2 changes: 1 addition & 1 deletion tests/perf/org/jgroups/tests/perf/MPerf.java
Expand Up @@ -92,7 +92,7 @@ public void start(String props, String name) throws Exception {
JmxConfigurator.registerChannel(channel, Util.getMBeanServer(), "jgroups", "mperf", true); JmxConfigurator.registerChannel(channel, Util.getMBeanServer(), "jgroups", "mperf", true);


// send a CONFIG_REQ to the current coordinator, so we can get the current config // send a CONFIG_REQ to the current coordinator, so we can get the current config
Address coord=channel.getView().getMembers().get(0); Address coord=channel.getView().getCoord();
if(coord != null && !local_addr.equals(coord)) if(coord != null && !local_addr.equals(coord))
send(coord,null,MPerfHeader.CONFIG_REQ, Message.Flag.RSVP); send(coord,null,MPerfHeader.CONFIG_REQ, Message.Flag.RSVP);
} }
Expand Down
2 changes: 1 addition & 1 deletion tests/perf/org/jgroups/tests/perf/MPerfRpc.java
Expand Up @@ -120,7 +120,7 @@ public void start(String props, String name) throws Exception {
JmxConfigurator.registerChannel(channel, Util.getMBeanServer(), "jgroups", "mperf", true); JmxConfigurator.registerChannel(channel, Util.getMBeanServer(), "jgroups", "mperf", true);


// send a CONFIG_REQ to the current coordinator, so we can get the current config // send a CONFIG_REQ to the current coordinator, so we can get the current config
Address coord=channel.getView().getMembers().get(0); Address coord=channel.getView().getCoord();
if(coord != null && !local_addr.equals(coord)) if(coord != null && !local_addr.equals(coord))
invokeRpc(configReq, coord, RequestOptions.ASYNC().flags(Message.Flag.RSVP), local_addr); invokeRpc(configReq, coord, RequestOptions.ASYNC().flags(Message.Flag.RSVP), local_addr);
} }
Expand Down

0 comments on commit 2c0ee9b

Please sign in to comment.