Skip to content

Commit

Permalink
- Implemented sticky site masters (https://issues.jboss.org/browse/JG…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Oct 25, 2016
1 parent 3173bc7 commit 77a2e82
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 134 deletions.
96 changes: 61 additions & 35 deletions src/org/jgroups/protocols/relay/RELAY2.java
Expand Up @@ -52,7 +52,7 @@ public class RELAY2 extends Protocol {


@Property(description="Whether or not we generate our own addresses in which we use can_become_site_master. " + @Property(description="Whether or not we generate our own addresses in which we use can_become_site_master. " +
"If this property is false, can_become_site_master is ignored") "If this property is false, can_become_site_master is ignored")
protected boolean enable_address_tagging=false; protected boolean enable_address_tagging;


@Property(description="Whether or not to relay multicast (dest=null) messages") @Property(description="Whether or not to relay multicast (dest=null) messages")
protected boolean relay_multicasts=true; protected boolean relay_multicasts=true;
Expand All @@ -63,7 +63,10 @@ public class RELAY2 extends Protocol {


@Property(description="If true, logs a warning if the FORWARD_TO_COORD protocol is not found. This property might " + @Property(description="If true, logs a warning if the FORWARD_TO_COORD protocol is not found. This property might " +
"get deprecated soon") "get deprecated soon")
protected boolean warn_when_ftc_missing=false; protected boolean warn_when_ftc_missing;

@Property(description="Fully qualified name of a class implementing SiteMasterPicker")
protected String site_master_picker_impl;




/* --------------------------------------------- Fields ------------------------------------------------ */ /* --------------------------------------------- Fields ------------------------------------------------ */
Expand All @@ -74,11 +77,13 @@ public class RELAY2 extends Protocol {
protected RelayConfig.SiteConfig site_config; protected RelayConfig.SiteConfig site_config;


@ManagedAttribute(description="Whether this member is a site master") @ManagedAttribute(description="Whether this member is a site master")
protected volatile boolean is_site_master=false; protected volatile boolean is_site_master;


// A list of site masters in this (local) site // A list of site masters in this (local) site
protected volatile List<Address> site_masters; protected volatile List<Address> site_masters;


protected SiteMasterPicker site_master_picker;

protected volatile Relayer relayer; protected volatile Relayer relayer;


protected TimeScheduler timer; protected TimeScheduler timer;
Expand All @@ -93,7 +98,7 @@ public class RELAY2 extends Protocol {


@Property(description="If true, a site master forwards messages received from other sites to randomly chosen " + @Property(description="If true, a site master forwards messages received from other sites to randomly chosen " +
"members of the local site for load balancing, reducing work for itself") "members of the local site for load balancing, reducing work for itself")
protected boolean can_forward_local_cluster=false; protected boolean can_forward_local_cluster;


// protocol IDs above RELAY2 // protocol IDs above RELAY2
protected short[] prots_above; protected short[] prots_above;
Expand Down Expand Up @@ -125,24 +130,25 @@ public class RELAY2 extends Protocol {




// Fluent configuration // Fluent configuration
public RELAY2 site(String site_name) {site=site_name; return this;} public RELAY2 site(String site_name) {site=site_name; return this;}
public RELAY2 config(String cfg) {config=cfg; return this;} public RELAY2 config(String cfg) {config=cfg; return this;}
public RELAY2 canBecomeSiteMaster(boolean flag) {can_become_site_master=flag; return this;} public RELAY2 canBecomeSiteMaster(boolean flag) {can_become_site_master=flag; return this;}
public RELAY2 enableAddressTagging(boolean flag) {enable_address_tagging=flag; return this;} public RELAY2 enableAddressTagging(boolean flag) {enable_address_tagging=flag; return this;}
public RELAY2 relayMulticasts(boolean flag) {relay_multicasts=flag; return this;} public RELAY2 relayMulticasts(boolean flag) {relay_multicasts=flag; return this;}
public RELAY2 asyncRelayCreation(boolean flag) {async_relay_creation=flag; return this;} public RELAY2 asyncRelayCreation(boolean flag) {async_relay_creation=flag; return this;}

public RELAY2 siteMasterPicker(SiteMasterPicker s) {if(s != null) this.site_master_picker=s; return this;}
public String site() {return site;}
public List<String> siteNames() {return getSites();} public String site() {return site;}
public String config() {return config;} public List<String> siteNames() {return getSites();}
public boolean canBecomeSiteMaster() {return can_become_site_master;} public String config() {return config;}
public boolean enableAddressTagging() {return enable_address_tagging;} public boolean canBecomeSiteMaster() {return can_become_site_master;}
public boolean relayMulticasts() {return relay_multicasts;} public boolean enableAddressTagging() {return enable_address_tagging;}
public boolean asyncRelayCreation() {return async_relay_creation;} public boolean relayMulticasts() {return relay_multicasts;}
public Address getLocalAddress() {return local_addr;} public boolean asyncRelayCreation() {return async_relay_creation;}
public TimeScheduler getTimer() {return timer;} public Address getLocalAddress() {return local_addr;}
public void incrementRelayed() {relayed.incrementAndGet();} public TimeScheduler getTimer() {return timer;}
public void addToRelayedTime(long delta) {relayed_time.addAndGet(delta);} public void incrementRelayed() {relayed.incrementAndGet();}
public void addToRelayedTime(long delta) {relayed_time.addAndGet(delta);}




public RouteStatusListener getRouteStatusListener() {return route_status_listener;} public RouteStatusListener getRouteStatusListener() {return route_status_listener;}
Expand Down Expand Up @@ -230,6 +236,18 @@ public List<String> getSites() {
public void init() throws Exception { public void init() throws Exception {
super.init(); super.init();
configure(); configure();

if(site_master_picker == null) {
site_master_picker=new SiteMasterPicker() {
public Address pickSiteMaster(List<Address> site_masters, Address original_sender) {
return Util.pickRandomElement(site_masters);
}

public Route pickRoute(String site, List<Route> routes, Address original_sender) {
return Util.pickRandomElement(routes);
}
};
}
} }


public void configure() throws Exception { public void configure() throws Exception {
Expand All @@ -244,6 +262,12 @@ public void configure() throws Exception {
log.warn("max_size_masters was " + max_site_masters + ", changed to 1"); log.warn("max_size_masters was " + max_site_masters + ", changed to 1");
max_site_masters=1; max_site_masters=1;
} }

if(site_master_picker_impl != null) {
Class<SiteMasterPicker> clazz=Util.loadClass(site_master_picker_impl, (Class)null);
this.site_master_picker=clazz.newInstance();
}

if(config != null) if(config != null)
parseSiteConfiguration(sites); parseSiteConfiguration(sites);


Expand Down Expand Up @@ -316,7 +340,7 @@ public String printRoutes() {
*/ */
public JChannel getBridge(String site_name) { public JChannel getBridge(String site_name) {
Relayer tmp=relayer; Relayer tmp=relayer;
Relayer.Route route=tmp != null? tmp.getRoute(site_name): null; Route route=tmp != null? tmp.getRoute(site_name): null;
return route != null? route.bridge() : null; return route != null? route.bridge() : null;
} }


Expand All @@ -325,7 +349,7 @@ public JChannel getBridge(String site_name) {
* @param site_name The site name, e.g. "SFO" * @param site_name The site name, e.g. "SFO"
* @return The route to the given site, or null if no route was found or we're not the coordinator * @return The route to the given site, or null if no route was found or we're not the coordinator
*/ */
public Relayer.Route getRoute(String site_name) { public Route getRoute(String site_name) {
Relayer tmp=relayer; Relayer tmp=relayer;
return tmp != null? tmp.getRoute(site_name): null; return tmp != null? tmp.getRoute(site_name): null;
} }
Expand Down Expand Up @@ -373,10 +397,10 @@ public Object down(Message msg) {
return null; return null;
} }


// forward to the coordinator unless we're the coord (then route the message directly) // forward to the site master unless we're the site master (then route the message directly)
if(!is_site_master) { if(!is_site_master) {
long start=stats? System.nanoTime() : 0; long start=stats? System.nanoTime() : 0;
Address site_master=pickSiteMaster(); Address site_master=pickSiteMaster(sender);
if(site_master == null) if(site_master == null)
throw new IllegalStateException("site master is null"); throw new IllegalStateException("site master is null");
forwardTo(site_master, target, sender, msg, max_site_masters == 1); forwardTo(site_master, target, sender, msg, max_site_masters == 1);
Expand Down Expand Up @@ -527,24 +551,23 @@ protected void route(SiteAddress dest, SiteAddress sender, Message msg) {
return; return;
} }


Relayer.Route route=tmp.getRoute(target_site); Route route=tmp.getRoute(target_site, sender);
if(route == null) { if(route == null) {
log.error(local_addr + ": no route to " + target_site + ": dropping message"); log.error(local_addr + ": no route to " + target_site + ": dropping message");
sendSiteUnreachableTo(sender, target_site); sendSiteUnreachableTo(sender, target_site);
} }
else { else
route.send(dest,sender,msg); route.send(dest,sender,msg);
}
} }




/** Sends the message via all bridges excluding the excluded_sites bridges */ /** Sends the message via all bridges excluding the excluded_sites bridges */
protected void sendToBridges(Address sender, final Message msg, String ... excluded_sites) { protected void sendToBridges(Address sender, final Message msg, String ... excluded_sites) {
Relayer tmp=relayer; Relayer tmp=relayer;
List<Relayer.Route> routes=tmp != null? tmp.getRoutes(excluded_sites) : null; List<Route> routes=tmp != null? tmp.getRoutes(excluded_sites) : null;
if(routes == null) if(routes == null)
return; return;
for(Relayer.Route route: routes) { for(Route route: routes) {
if(log.isTraceEnabled()) if(log.isTraceEnabled())
log.trace(local_addr + ": relaying multicast message from " + sender + " via route " + route); log.trace(local_addr + ": relaying multicast message from " + sender + " via route " + route);
try { try {
Expand Down Expand Up @@ -589,7 +612,7 @@ protected void deliverLocally(SiteAddress dest, SiteAddress sender, Message msg)
boolean send_to_coord=false; boolean send_to_coord=false;
if(dest instanceof SiteUUID) { if(dest instanceof SiteUUID) {
if(dest instanceof SiteMaster) { if(dest instanceof SiteMaster) {
local_dest=pickSiteMaster(); local_dest=pickSiteMaster(sender);
if(local_dest == null) if(local_dest == null)
throw new IllegalStateException("site master was null"); throw new IllegalStateException("site master was null");
send_to_coord=true; send_to_coord=true;
Expand Down Expand Up @@ -709,9 +732,12 @@ protected List<Address> determineSiteMasters(View view) {
return retval; return retval;
} }


/** Returns a random site master from site_masters */ /** Returns a site master from site_masters */
protected Address pickSiteMaster() { protected Address pickSiteMaster(Address sender) {
return Util.pickRandomElement(site_masters); List<Address> masters=site_masters;
if(masters.size() == 1)
return masters.get(0);
return site_master_picker.pickSiteMaster(masters, sender);
} }




Expand Down
76 changes: 11 additions & 65 deletions src/org/jgroups/protocols/relay/Relayer.java
Expand Up @@ -114,15 +114,16 @@ public synchronized String printRoutes() {
} }




protected Route getRoute(String site) { return getRoute(site, null);}


/** protected synchronized Route getRoute(String site, Address sender) {
* Grabs a random route
* @param site
* @return
*/
protected synchronized Route getRoute(String site) {
List<Route> list=routes.get(site); List<Route> list=routes.get(site);
return list == null? null : Util.pickRandomElement(list); if(list == null)
return null;
if(list.size() == 1)
return list.get(0);

return relay.site_master_picker.pickRoute(site, list, sender);
} }


protected List<String> getSiteNames() { protected List<String> getSiteNames() {
Expand Down Expand Up @@ -165,62 +166,6 @@ protected static boolean isExcluded(Route route, String... excluded_sites) {






/**
* Includes information about the site master of the route and the channel to be used
*/
public class Route implements Comparable<Route> {
/** SiteUUID: address of the site master */
protected final Address site_master;
protected final JChannel bridge;

public Route(Address site_master, JChannel bridge) {
this.site_master=site_master;
this.bridge=bridge;
}

public JChannel bridge() {return bridge;}
public Address siteMaster() {return site_master;}

public void send(Address final_destination, Address original_sender, final Message msg) {
if(log.isTraceEnabled())
log.trace("routing message to " + final_destination + " via " + site_master);
long start=stats? System.nanoTime() : 0;
try {
Message copy=createMessage(site_master, final_destination, original_sender, msg);
bridge.send(copy);
if(stats) {
relay.addToRelayedTime(System.nanoTime() - start);
relay.incrementRelayed();
}
}
catch(Exception e) {
log.error(Util.getMessage("FailureRelayingMessage"), e);
}
}

public int compareTo(Route o) {
return site_master.compareTo(o.siteMaster());
}

public boolean equals(Object obj) {
return compareTo((Route)obj) == 0;
}

public int hashCode() {
return site_master.hashCode();
}

public String toString() {
return (site_master != null? site_master.toString() : "");
}

protected Message createMessage(Address target, Address final_destination, Address original_sender, final Message msg) {
Message copy=relay.copy(msg).dest(target).src(null);
RELAY2.Relay2Header hdr=new RELAY2.Relay2Header(RELAY2.Relay2Header.DATA, final_destination, original_sender);
copy.putHeader(relay.getId(), hdr);
return copy;
}
}




protected class Bridge extends ReceiverAdapter { protected class Bridge extends ReceiverAdapter {
Expand All @@ -247,7 +192,7 @@ protected void stop() {
} }


public void receive(Message msg) { public void receive(Message msg) {
RELAY2.Relay2Header hdr=(RELAY2.Relay2Header)msg.getHeader(relay.getId()); RELAY2.Relay2Header hdr=msg.getHeader(relay.getId());
if(hdr == null) { if(hdr == null) {
log.warn("received a message without a relay header; discarding it"); log.warn("received a message without a relay header; discarding it");
return; return;
Expand Down Expand Up @@ -290,7 +235,8 @@ public void viewAccepted(View new_view) {
} }


// Add routes that aren't yet in the routing table: // Add routes that aren't yet in the routing table:
val.stream().filter(addr -> !contains(list, addr)).forEach(addr -> list.add(new Route(addr, channel))); val.stream().filter(addr -> !contains(list, addr))
.forEach(addr -> list.add(new Route(addr, channel, relay, log).stats(stats)));


if(list.isEmpty()) { if(list.isEmpty()) {
routes.remove(key); routes.remove(key);
Expand Down
75 changes: 75 additions & 0 deletions src/org/jgroups/protocols/relay/Route.java
@@ -0,0 +1,75 @@
package org.jgroups.protocols.relay;


import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.logging.Log;
import org.jgroups.util.Util;


/**
* Class which maintains the destination address for sending messages to a given site, and the bridge channel to do so.
* @author Bela Ban
* @since 3.x
*/
public class Route implements Comparable<Route> {
/** SiteUUID: address of the site master */
protected final Address site_master;
protected final JChannel bridge;
protected final RELAY2 relay;
protected final Log log;
protected boolean stats=true;

public Route(Address site_master, JChannel bridge, RELAY2 relay, Log log) {
this.site_master=site_master;
this.bridge=bridge;
this.relay=relay;
this.log=log;
}

public JChannel bridge() {return bridge;}
public Address siteMaster() {return site_master;}
public boolean stats() {return stats;}
public Route stats(boolean f) {stats=f; return this;}

public void send(Address final_destination, Address original_sender, final Message msg) {
if(log.isTraceEnabled())
log.trace("routing message to " + final_destination + " via " + site_master);
long start=stats? System.nanoTime() : 0;
try {
Message copy=createMessage(site_master, final_destination, original_sender, msg);
bridge.send(copy);
if(stats) {
relay.addToRelayedTime(System.nanoTime() - start);
relay.incrementRelayed();
}
}
catch(Exception e) {
log.error(Util.getMessage("FailureRelayingMessage"), e);
}
}

public int compareTo(Route o) {
return site_master.compareTo(o.siteMaster());
}

public boolean equals(Object obj) {
return compareTo((Route)obj) == 0;
}

public int hashCode() {
return site_master.hashCode();
}

public String toString() {
return (site_master != null? site_master.toString() : "");
}

protected Message createMessage(Address target, Address final_destination, Address original_sender, final Message msg) {
Message copy=relay.copy(msg).dest(target).src(null);
RELAY2.Relay2Header hdr=new RELAY2.Relay2Header(RELAY2.Relay2Header.DATA, final_destination, original_sender);
copy.putHeader(relay.getId(), hdr);
return copy;
}
}

0 comments on commit 77a2e82

Please sign in to comment.