Skip to content

Commit

Permalink
RouterStubManager: stubs now have identity (instead of using InetSock…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Feb 6, 2013
1 parent b956831 commit 337dfbc
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 39 deletions.
5 changes: 3 additions & 2 deletions src/org/jgroups/protocols/TCPGOSSIP.java
Expand Up @@ -182,8 +182,9 @@ public void addInitialHost(String hostname, int port) {

@ManagedOperation
public boolean removeInitialHost(String hostname, int port) {
InetSocketAddress isa = new InetSocketAddress(hostname, port);
RouterStub unregisterStub = stubManager.unregisterStub(isa);
InetSocketAddress isa = new InetSocketAddress(hostname, port);
RouterStub stub=new RouterStub(isa);
RouterStub unregisterStub = stubManager.unregisterStub(stub);
if(unregisterStub != null) {
stubManager.stopReconnecting(unregisterStub);
unregisterStub.destroy();
Expand Down
4 changes: 0 additions & 4 deletions src/org/jgroups/protocols/UNICAST.java
Expand Up @@ -507,10 +507,6 @@ public Object down(Event evt) {
}


protected void send(Message msg, Event evt) {
down_prot.down(evt);
num_msgs_sent++;
}

/**
* Removes and resets from connection table (which is already locked). Returns true if member was found,
Expand Down
30 changes: 24 additions & 6 deletions src/org/jgroups/stack/RouterStub.java
Expand Up @@ -11,10 +11,7 @@

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -23,7 +20,7 @@
* Client stub that talks to a remote GossipRouter
* @author Bela Ban
*/
public class RouterStub {
public class RouterStub implements Comparable<RouterStub> {

public static enum ConnectionStatus {INITIAL, CONNECTION_BROKEN, CONNECTION_ESTABLISHED, CONNECTED,DISCONNECTED};

Expand Down Expand Up @@ -74,7 +71,11 @@ public RouterStub(String routerHost, int routerPort, InetAddress bindAddress, Co
bind_addr=bindAddress;
conn_listener=l;
}


public RouterStub(InetSocketAddress addr) {
this(addr.getHostName(), addr.getPort(), null, null);
}

public void setReceiver(StubReceiver receiver) {
this.receiver = receiver;
}
Expand All @@ -91,6 +92,23 @@ public void setTcpNoDelay(boolean tcp_nodelay) {
this.tcp_nodelay=tcp_nodelay;
}

// Note that this would fail to return 0 if we had a dotted decimal and a symbolic addr resolving to the same host !
public int compareTo(RouterStub o) {
int rc=router_host.compareTo(o.router_host);
if(rc != 0)
return rc;
return router_port < o.router_port? -1 : router_port > o.router_port? 1 : 0;
}

public boolean equals(Object obj) {
RouterStub o=(RouterStub)obj;
return compareTo(o) == 0;
}

public int hashCode() {
return router_host.hashCode() + router_port;
}

public void interrupt() {
StubReceiver tmp=receiver;
if(tmp != null) {
Expand Down
55 changes: 28 additions & 27 deletions src/org/jgroups/stack/RouterStubManager.java
Expand Up @@ -30,15 +30,19 @@
import org.jgroups.util.TimeScheduler;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

/**
* Manages a list of RouterStubs (e.g. health checking, reconnecting etc.
* @author Vladimir Blagojevic
* @author Bela Ban
*/
public class RouterStubManager implements RouterStub.ConnectionListener {

@GuardedBy("reconnectorLock")
private final ConcurrentMap<InetSocketAddress, Future<?>> futures=new ConcurrentHashMap<InetSocketAddress, Future<?>>();
private final ConcurrentMap<RouterStub,Future<?>> futures=new ConcurrentHashMap<RouterStub,Future<?>>();
private final List<RouterStub> stubs;

private final Protocol owner;
Expand All @@ -51,7 +55,7 @@ public class RouterStubManager implements RouterStub.ConnectionListener {

public RouterStubManager(Protocol owner, String channelName, Address logicalAddress, long interval) {
this.owner = owner;
this.stubs = new CopyOnWriteArrayList<RouterStub>();
this.stubs = new CopyOnWriteArrayList<RouterStub>();
this.log = LogFactory.getLog(owner.getClass());
this.timer = owner.getTransport().getTimer();
this.channelName = channelName;
Expand All @@ -69,35 +73,34 @@ public List<RouterStub> getStubs(){

public RouterStub createAndRegisterStub(String routerHost, int routerPort, InetAddress bindAddress) {
RouterStub s = new RouterStub(routerHost,routerPort,bindAddress,this);
unregisterAndDestroyStub(s.getGossipRouterAddress());
unregisterAndDestroyStub(s);
stubs.add(s);
return s;
}

public void registerStub(RouterStub s) {
unregisterAndDestroyStub(s.getGossipRouterAddress());
unregisterAndDestroyStub(s);
stubs.add(s);
}

public boolean unregisterStub(final RouterStub s) {
return stubs.remove(s);
}

public RouterStub unregisterStub(final InetSocketAddress address) {
if(address == null)
throw new IllegalArgumentException("Cannot remove null address");
for (RouterStub s : stubs) {
if (s.getGossipRouterAddress().equals(address)) {
stubs.remove(address);
return s;
public RouterStub unregisterStub(final RouterStub stub) {
if(stub == null)
throw new IllegalArgumentException("Cannot remove null stub");
RouterStub found=null;
for (RouterStub s: stubs) {
if (s.equals(stub)) {
found=s;
break;
}
}
return null;
if(found != null)
stubs.remove(found);
return found;
}

public boolean unregisterAndDestroyStub(final InetSocketAddress address) {
RouterStub unregisteredStub = unregisterStub(address);
if(unregisteredStub !=null) {
public boolean unregisterAndDestroyStub(final RouterStub stub) {
RouterStub unregisteredStub = unregisterStub(stub);
if(unregisteredStub != null) {
unregisteredStub.destroy();
return true;
}
Expand All @@ -122,8 +125,7 @@ public void destroyStubs() {
}

public void startReconnecting(final RouterStub stub) {
InetSocketAddress routerAddress = stub.getGossipRouterAddress();
Future<?> f = futures.remove(routerAddress);
Future<?> f = futures.remove(stub);
if (f != null)
f.cancel(true);

Expand All @@ -148,15 +150,14 @@ public String toString() {
}
};
f = timer.scheduleWithFixedDelay(reconnector, 0, interval, TimeUnit.MILLISECONDS);
futures.putIfAbsent(stub.getGossipRouterAddress(), f);
futures.putIfAbsent(stub, f);
}

public void stopReconnecting(final RouterStub stub) {
InetSocketAddress routerAddress = stub.getGossipRouterAddress();
Future<?> f = futures.get(stub.getGossipRouterAddress());
Future<?> f = futures.get(stub);
if (f != null) {
f.cancel(true);
futures.remove(routerAddress);
futures.remove(stub);
}

final Runnable pinger = new Runnable() {
Expand All @@ -176,7 +177,7 @@ public String toString() {
}
};
f = timer.scheduleWithFixedDelay(pinger, 1000, interval, TimeUnit.MILLISECONDS);
futures.putIfAbsent(stub.getGossipRouterAddress(),f);
futures.putIfAbsent(stub, f);
}


Expand Down
76 changes: 76 additions & 0 deletions tests/junit-functional/org/jgroups/tests/RouterStubTest.java
@@ -0,0 +1,76 @@
package org.jgroups.tests;

import org.jgroups.Global;
import org.jgroups.stack.RouterStub;
import org.testng.annotations.Test;

import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;

/**
* @author Bela Ban
* @since 3.3
*/
@Test(groups=Global.FUNCTIONAL,sequential=false)
public class RouterStubTest {

public void testEquality() {
RouterStub stub1=new RouterStub("192.168.1.5", 5000, null, null);
RouterStub stub2=new RouterStub("192.168.1.5", 5000, null, null);
assert stub1.equals(stub2);
assert stub1.hashCode() == stub2.hashCode();
}

public void testInequality() {
RouterStub stub1=new RouterStub("192.168.1.5", 5000, null, null);
RouterStub stub2=new RouterStub("192.168.1.5", 5001, null, null);
assert !stub1.equals(stub2);
assert stub1.hashCode() != stub2.hashCode();
}

public void testCompareTo() {
RouterStub stub1=new RouterStub("192.168.1.5", 5000, null, null);
RouterStub stub2=new RouterStub("192.168.1.5", 5001, null, null);
assert stub1.compareTo(stub2) == -1;
}

public void testHashCode() {
RouterStub stub1=new RouterStub("192.168.1.5", 5000, null, null);
RouterStub stub2=new RouterStub("192.168.1.5", 5001, null, null);
RouterStub stub3=new RouterStub("192.168.1.4", 5000, null, null);
RouterStub stub4=new RouterStub("192.168.1.5", 5000, null, null);
Map<RouterStub,Integer> stubs=new HashMap<RouterStub,Integer>();

stubs.put(stub1, 1);
stubs.put(stub2, 2);
stubs.put(stub3, 3);
System.out.println("stubs = " + stubs);
assert stubs.size() == 3;
stubs.put(stub4, 4);
System.out.println("stubs = " + stubs);
assert stubs.size() == 3;
assert stubs.get(stub1) == 4;
assert stubs.get(stub2) == 2;
assert stubs.get(stub3) == 3;
}


public void testList() {
Set<RouterStub> stubs=new CopyOnWriteArraySet<RouterStub>();
RouterStub stub1=new RouterStub("192.168.1.5", 5000, null, null);
RouterStub stub2=new RouterStub("192.168.1.5", 5001, null, null);
RouterStub stub3=new RouterStub("192.168.1.4", 5000, null, null);
RouterStub stub4=new RouterStub("192.168.1.5", 5000, null, null);

for(RouterStub stub: Arrays.asList(stub1, stub2, stub3, stub4))
stubs.add(stub);
System.out.println("stubs = " + stubs);
assert stubs.size() == 3;

boolean obj=stubs.remove(stub3);
assert obj;
assert stubs.size() == 2;
stubs.add(stub3);
assert stubs.size() == 3;
}
}

0 comments on commit 337dfbc

Please sign in to comment.