Skip to content

Commit

Permalink
- Added IpAddressUUID: extends IpAddress and adds randomness to it (h…
Browse files Browse the repository at this point in the history
…ttps://issues.jboss.org/browse/JGRP-2080)

- Added TP.use_ip_addrs (https://issues.jboss.org/browse/JGRP-2080)
- Added field 'sender' to GossipData
- Added dump_msgs to GossipRouter (dumps msgs to stdout after routing them)
- Sending address of the original sender in GossipData (RouterStub, TUNNEL)
- Added marshalling of IpAddressUUID in Util
- RELAY2 now throws an exception if TP.use_ip_addrs is enabled
- IpAddressUUID now prints the logical name if available
- Changed SET_PHYSICAL_ADDR -> ADD_PHYSICAL_ADDR
  • Loading branch information
belaban committed Sep 1, 2016
1 parent 20e9924 commit 323bfe6
Show file tree
Hide file tree
Showing 29 changed files with 551 additions and 275 deletions.
43 changes: 22 additions & 21 deletions conf/jg-magic-map.xml
Expand Up @@ -4,27 +4,28 @@

<!-- Generic classes -->
<class id="1" name="org.jgroups.stack.IpAddress"/>
<class id="2" name="org.jgroups.Message"/>
<class id="3" name="org.jgroups.View"/>
<class id="4" name="org.jgroups.ViewId"/>
<class id="5" name="org.jgroups.MergeView"/>
<class id="6" name="org.jgroups.protocols.PingData"/>
<class id="7" name="org.jgroups.protocols.pbcast.JoinRsp"/>
<class id="8" name="org.jgroups.util.Digest"/>
<class id="9" name="org.jgroups.util.UUID"/>
<class id="10" name="org.jgroups.util.ExtendedUUID"/>
<class id="11" name="org.jgroups.protocols.relay.SiteUUID"/>
<class id="12" name="org.jgroups.protocols.relay.SiteMaster"/>
<class id="13" name="org.jgroups.protocols.Executing$Request"/>
<class id="14" name="org.jgroups.blocks.executor.ExecutionService$RunnableAdapter"/>
<class id="15" name="org.jgroups.blocks.executor.Executions$StreamableCallable"/>
<class id="16" name="org.jgroups.util.SeqnoList"/>
<class id="17" name="org.jgroups.AnycastAddress"/>
<class id="18" name="org.jgroups.auth.SimpleToken"/>
<class id="19" name="org.jgroups.auth.FixedMembershipToken"/>
<class id="20" name="org.jgroups.auth.MD5Token"/>
<class id="21" name="org.jgroups.auth.X509Token"/>
<class id="22" name="org.jgroups.blocks.MethodCall"/>
<class id="2" name="org.jgroups.stack.IpAddressUUID"/>
<class id="3" name="org.jgroups.Message"/>
<class id="4" name="org.jgroups.View"/>
<class id="5" name="org.jgroups.ViewId"/>
<class id="6" name="org.jgroups.MergeView"/>
<class id="7" name="org.jgroups.protocols.PingData"/>
<class id="8" name="org.jgroups.protocols.pbcast.JoinRsp"/>
<class id="9" name="org.jgroups.util.Digest"/>
<class id="10" name="org.jgroups.util.UUID"/>
<class id="11" name="org.jgroups.util.ExtendedUUID"/>
<class id="12" name="org.jgroups.protocols.relay.SiteUUID"/>
<class id="13" name="org.jgroups.protocols.relay.SiteMaster"/>
<class id="14" name="org.jgroups.protocols.Executing$Request"/>
<class id="15" name="org.jgroups.blocks.executor.ExecutionService$RunnableAdapter"/>
<class id="16" name="org.jgroups.blocks.executor.Executions$StreamableCallable"/>
<class id="17" name="org.jgroups.util.SeqnoList"/>
<class id="18" name="org.jgroups.AnycastAddress"/>
<class id="19" name="org.jgroups.auth.SimpleToken"/>
<class id="20" name="org.jgroups.auth.FixedMembershipToken"/>
<class id="21" name="org.jgroups.auth.MD5Token"/>
<class id="22" name="org.jgroups.auth.X509Token"/>
<class id="23" name="org.jgroups.blocks.MethodCall"/>

<!-- Headers -->
<class id="50" name="org.jgroups.protocols.FD$FdHeader"/>
Expand Down
11 changes: 6 additions & 5 deletions src/org/jgroups/Address.java
Expand Up @@ -18,11 +18,12 @@
*/
public interface Address extends Streamable, Comparable<Address> {
// flags used for marshalling
byte NULL = 1 << 0;
byte UUID_ADDR = 1 << 1;
byte SITE_UUID = 1 << 2;
byte SITE_MASTER = 1 << 3;
byte IP_ADDR = 1 << 4;
byte NULL = 1 << 0;
byte UUID_ADDR = 1 << 1;
byte SITE_UUID = 1 << 2;
byte SITE_MASTER = 1 << 3;
byte IP_ADDR = 1 << 4;
byte IP_ADDR_UUID = 1 << 5;


/** Returns serialized size of this address */
Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/Event.java
Expand Up @@ -43,7 +43,7 @@ public class Event {
public static final int CONNECT_WITH_STATE_TRANSFER = 80; // arg = cluster name (string)
public static final int GET_PHYSICAL_ADDRESS = 87; // arg = Address --> PhysicalAddress
public static final int GET_LOGICAL_PHYSICAL_MAPPINGS = 88; // arg = boolean --> Map<Address,PhysicalAddress>
public static final int SET_PHYSICAL_ADDRESS = 89; // arg = Tuple<Address,PhysicalAddress> --> boolean
public static final int ADD_PHYSICAL_ADDRESS = 89; // arg = Tuple<Address,PhysicalAddress> --> boolean
public static final int REMOVE_ADDRESS = 90; // arg = Address
public static final int GET_LOCAL_ADDRESS = 91; // arg = null --> UUID (local_addr)
public static final int CONNECT_USE_FLUSH = 92;
Expand Down Expand Up @@ -133,7 +133,7 @@ public static String type2String(int t) {
case CONNECT_WITH_STATE_TRANSFER: return "CONNECT_WITH_STATE_TRANSFER";
case GET_PHYSICAL_ADDRESS: return "GET_PHYSICAL_ADDRESS";
case GET_LOGICAL_PHYSICAL_MAPPINGS: return "GET_LOGICAL_PHYSICAL_MAPPINGS";
case SET_PHYSICAL_ADDRESS: return "SET_PHYSICAL_ADDRESS";
case ADD_PHYSICAL_ADDRESS: return "ADD_PHYSICAL_ADDRESS";
case REMOVE_ADDRESS: return "REMOVE_ADDRESS";
case GET_LOCAL_ADDRESS: return "GET_LOCAL_ADDRESS";
case CONNECT_USE_FLUSH: return "CONNECT_USE_FLUSH";
Expand Down
1 change: 0 additions & 1 deletion src/org/jgroups/Global.java
Expand Up @@ -47,7 +47,6 @@ public final class Global {

public static final String MAGIC_NUMBER_FILE="jgroups.conf.magic_number_file";
public static final String PROTOCOL_ID_FILE="jgroups.conf.protocol_id_file";
public static final String RESOLVE_DNS="jgroups.resolve_dns";
public static final String PRINT_UUIDS="jgroups.print_uuids";
public static final String UUID_CACHE_MAX_ELEMENTS="jgroups.uuid_cache.max_elements";
public static final String UUID_CACHE_MAX_AGE="jgroups.uuid_cache.max_age";
Expand Down
13 changes: 11 additions & 2 deletions src/org/jgroups/JChannel.java
Expand Up @@ -754,6 +754,15 @@ public Object up(Event evt) {
case Event.GET_LOCAL_ADDRESS:
return local_addr;

case Event.SET_LOCAL_ADDRESS:
Address tmp_addr=evt.arg();
if(tmp_addr != null) {
this.local_addr=tmp_addr;
if(name != null && !name.isEmpty())
UUID.add(local_addr, name);
}
break;

default:
break;
}
Expand Down Expand Up @@ -868,7 +877,7 @@ protected JChannel _connect(Event connect_event) throws Exception {
stopStack(true, false);
state=State.OPEN;
init();
throw new Exception("connecting to channel \"" + connect_event.getArg() + "\" failed", t);
throw new Exception("connecting to channel " + connect_event.getArg() + " failed", t);
}
}

Expand Down Expand Up @@ -991,7 +1000,7 @@ protected JChannel startStack(String cluster_name) throws Exception {
}

/**
* Generates new UUID and sets local address. Sends down a REMOVE_ADDRESS (if existing address was present) and
* Generates and sets local_addr. Sends down a REMOVE_ADDRESS (if existing address was present) and
* a SET_LOCAL_ADDRESS
*/
protected JChannel setAddress() {
Expand Down
1 change: 1 addition & 0 deletions src/org/jgroups/PhysicalAddress.java
Expand Up @@ -10,4 +10,5 @@
* @author Bela Ban
*/
public interface PhysicalAddress extends Address {
String printIpAddress();
}
8 changes: 7 additions & 1 deletion src/org/jgroups/blocks/LazyRemovalCache.java
Expand Up @@ -5,6 +5,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -274,7 +275,11 @@ public V getVal() {
}

public String toString() {
StringBuilder sb=new StringBuilder(val.toString()).append(" (");
return toString(null);
}

public String toString(Function<V,String> print_val) {
StringBuilder sb=new StringBuilder(print_val != null? print_val.apply(val) : val.toString()).append(" (");
long age=TimeUnit.MILLISECONDS.convert(System.nanoTime() - timestamp, TimeUnit.NANOSECONDS);
if(age < 1000)
sb.append(age).append(" ms");
Expand All @@ -283,5 +288,6 @@ public String toString() {
sb.append(" old").append((removable? ", removable" : "")).append(")");
return sb.toString();
}

}
}
2 changes: 1 addition & 1 deletion src/org/jgroups/blocks/cs/TcpConnection.java
Expand Up @@ -227,7 +227,7 @@ protected Address readPeerAddress(Socket client_sock) throws Exception {
throw new IOException("packet from " + client_sock.getInetAddress() + ":" + client_sock.getPort() +
" has different version (" + Version.print(version) +
") from ours (" + Version.printVersion() + "); discarding it");
short addr_len=in.readShort(); // only needed by NioConnection
in.readShort(); // address length is only needed by NioConnection

Address client_peer_addr=new IpAddress();
client_peer_addr.readFrom(in);
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/Discovery.java
Expand Up @@ -512,7 +512,7 @@ protected boolean addDiscoveryResponseToCaches(Address mbr, String logical_name,
if(logical_name != null)
UUID.add(mbr, logical_name);
if(physical_addr != null)
return (Boolean)down(new Event(Event.SET_PHYSICAL_ADDRESS, new Tuple<>(mbr, physical_addr)));
return (Boolean)down(new Event(Event.ADD_PHYSICAL_ADDRESS, new Tuple<>(mbr, physical_addr)));
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/MERGE3.java
Expand Up @@ -322,7 +322,7 @@ protected void addInfo(Address sender, ViewId view_id, String logical_name, Phys
if(logical_name != null && sender instanceof UUID)
UUID.add(sender, logical_name);
if(physical_addr != null)
down(new Event(Event.SET_PHYSICAL_ADDRESS, new Tuple<>(sender, physical_addr)));
down(new Event(Event.ADD_PHYSICAL_ADDRESS, new Tuple<>(sender, physical_addr)));
synchronized(views) {
ViewId existing=views.get(sender);
if(existing == null || existing.compareTo(view_id) < 0)
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/PDC.java
Expand Up @@ -82,7 +82,7 @@ public Object down(Event evt) {
new_map.putAll(cache);
return new_map;

case Event.SET_PHYSICAL_ADDRESS:
case Event.ADD_PHYSICAL_ADDRESS:
Tuple<Address,PhysicalAddress> new_val=(Tuple<Address, PhysicalAddress>)evt.getArg();
if(new_val != null) {
cache.put(new_val.getVal1(), new_val.getVal2());
Expand Down
31 changes: 16 additions & 15 deletions src/org/jgroups/protocols/TCPPING.java
Expand Up @@ -41,9 +41,6 @@ public class TCPPING extends Discovery {
protected int max_dynamic_hosts=2000;
/* --------------------------------------------- Fields ------------------------------------------------------ */

/**
* List of PhysicalAddresses
*/

/** https://jira.jboss.org/jira/browse/JGRP-989 */
protected BoundedList<PhysicalAddress> dynamic_hosts;
Expand Down Expand Up @@ -110,8 +107,8 @@ public Object down(Event evt) {
}
}
break;
case Event.SET_PHYSICAL_ADDRESS:
Tuple<Address,PhysicalAddress> tuple=(Tuple<Address,PhysicalAddress>)evt.getArg();
case Event.ADD_PHYSICAL_ADDRESS:
Tuple<Address,PhysicalAddress> tuple=evt.getArg();
PhysicalAddress physical_addr=tuple.getVal2();
if(physical_addr != null && !initial_hosts.contains(physical_addr))
dynamic_hosts.addIfAbsent(physical_addr);
Expand Down Expand Up @@ -155,16 +152,20 @@ public void findMembers(List<Address> members, boolean initial_discovery, Respon
final Message msg=new Message(addr).setFlag(Message.Flag.INTERNAL, Message.Flag.DONT_BUNDLE, Message.Flag.OOB)
.putHeader(this.id,hdr).setBuffer(marshal(data));

if(async_discovery_use_separate_thread_per_request) {
timer.execute(() -> {
log.trace("%s: sending discovery request to %s", local_addr, msg.getDest());
down_prot.down(msg);
});
}
else {
log.trace("%s: sending discovery request to %s", local_addr, msg.getDest());
down_prot.down(msg);
}
if(async_discovery_use_separate_thread_per_request)
timer.execute(() -> sendDiscoveryRequest(msg));
else
sendDiscoveryRequest(msg);
}
}

protected void sendDiscoveryRequest(Message req) {
try {
log.trace("%s: sending discovery request to %s", local_addr, req.getDest());
down_prot.down(req);
}
catch(Throwable t) {
log.trace("sending discovery request to %s failed: %s", req.dest(), t);
}
}
}
Expand Down

0 comments on commit 323bfe6

Please sign in to comment.