Skip to content

Commit

Permalink
Improving testsuite
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Aug 12, 2011
1 parent 8245e0a commit e53bacd
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 69 deletions.
19 changes: 18 additions & 1 deletion src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1137,8 +1137,18 @@ protected void finalize() throws Throwable

super.finalize();
}

public void cleanup()
{
doClose(false);
}

public void close()
{
doClose(true);
}

protected void doClose(final boolean sendClose)
{
if (closed)
{
Expand Down Expand Up @@ -1176,7 +1186,14 @@ public void close()

for (ClientSessionFactory factory : clonedFactory)
{
factory.close();
if (sendClose)
{
factory.close();
}
else
{
factory.cleanup();
}
}

factories.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface ServerLocatorInternal extends ServerLocator
void setNodeID(String nodeID);

String getNodeID();

void cleanup();

ClientSessionFactory connect() throws Exception;

Expand Down
160 changes: 92 additions & 68 deletions src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class ClusterManagerImpl implements ClusterManager

private volatile ServerLocatorInternal backupServerLocator;

private final Set<ServerLocator> clusterLocators = new ConcurrentHashSet<ServerLocator>();
private final Set<ServerLocatorInternal> clusterLocators = new ConcurrentHashSet<ServerLocatorInternal>();

private final Executor executor;

Expand Down Expand Up @@ -143,21 +143,21 @@ public ClusterManagerImpl(final ExecutorFactory executorFactory,

this.clustered = clustered;
}

public String describe()
{
StringWriter str = new StringWriter();
PrintWriter out = new PrintWriter(str);

out.println("Information on " + this);
out.println("*******************************************************");
out.println("Topology: " + topology.describe("Toopology on " + this));

for (ClusterConnection conn : this.clusterConnections.values())
{
out.println(conn.describe());
}

out.println("*******************************************************");

return str.toString();
Expand All @@ -167,7 +167,7 @@ public String toString()
{
return "ClusterManagerImpl[server=" + server + "]@" + System.identityHashCode(this);
}

public synchronized void start() throws Exception
{
if (started)
Expand Down Expand Up @@ -204,56 +204,60 @@ public synchronized void start() throws Exception
started = true;
}

public synchronized void stop() throws Exception
public void stop() throws Exception
{
if (!started)
{
return;
}

if (clustered)
synchronized (this)
{
for (BroadcastGroup group : broadcastGroups.values())
if (!started)
{
group.stop();
managementService.unregisterBroadcastGroup(group.getName());
return;
}

broadcastGroups.clear();

for (ClusterConnection clusterConnection : clusterConnections.values())
if (clustered)
{
clusterConnection.stop();
managementService.unregisterCluster(clusterConnection.getName().toString());
}
for (BroadcastGroup group : broadcastGroups.values())
{
group.stop();
managementService.unregisterBroadcastGroup(group.getName());
}

}
broadcastGroups.clear();

for (Bridge bridge : bridges.values())
{
bridge.stop();
managementService.unregisterBridge(bridge.getName().toString());
}
for (ClusterConnection clusterConnection : clusterConnections.values())
{
clusterConnection.stop();
managementService.unregisterCluster(clusterConnection.getName().toString());
}

bridges.clear();
}

if (backupServerLocator != null)
{
backupServerLocator.close();
backupServerLocator = null;
for (Bridge bridge : bridges.values())
{
bridge.stop();
managementService.unregisterBridge(bridge.getName().toString());
}

bridges.clear();

if (backupServerLocator != null)
{
backupServerLocator.close();
backupServerLocator = null;
}
}

executor.execute(new Runnable()
for (ServerLocatorInternal clusterLocator : clusterLocators)
{
public void run()
try
{
for (ServerLocator clusterLocator : clusterLocators)
{
clusterLocator.close();
}
clusterLocators.clear();
clusterLocator.close();
}
});
catch (Exception e)
{
log.warn("Error closing serverLocator=" + clusterLocator + ", message=" + e.getMessage(), e);
}
}
clusterLocators.clear();
started = false;

clusterConnections.clear();
Expand All @@ -265,8 +269,8 @@ public void notifyNodeDown(String nodeID)
{
return;
}
log.debug(this + "::removing nodeID=" + nodeID, new Exception ("trace"));

log.debug(this + "::removing nodeID=" + nodeID, new Exception("trace"));

topology.removeMember(nodeID);

Expand All @@ -284,22 +288,32 @@ public void notifyNodeUp(final String nodeID,

TopologyMember member = new TopologyMember(connectorPair);
boolean updated = topology.addMember(nodeID, member, last);

if (!updated)
{
if (log.isDebugEnabled())
{
log.debug(this + " ignored notifyNodeUp on nodeID=" + nodeID + " pair=" + connectorPair + " as the topology already knew about it");
log.debug(this + " ignored notifyNodeUp on nodeID=" +
nodeID +
" pair=" +
connectorPair +
" as the topology already knew about it");
}
return;
}

if (log.isDebugEnabled())
{
log.debug(this + " received notifyNodeUp nodeID=" + nodeID + " connectorPair=" + connectorPair +
", nodeAnnounce=" + nodeAnnounce + ", last=" + last);
log.debug(this + " received notifyNodeUp nodeID=" +
nodeID +
" connectorPair=" +
connectorPair +
", nodeAnnounce=" +
nodeAnnounce +
", last=" +
last);
}

// if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster
// connections.
if (nodeAnnounce)
Expand All @@ -312,8 +326,14 @@ public void notifyNodeUp(final String nodeID,
{
if (log.isTraceEnabled())
{
log.trace(this + " information clusterConnection=" + clusterConnection +
" nodeID=" + nodeID + " connectorPair=" + connectorPair + " last=" + last);
log.trace(this + " information clusterConnection=" +
clusterConnection +
" nodeID=" +
nodeID +
" connectorPair=" +
connectorPair +
" last=" +
last);
}
clusterConnection.nodeUP(nodeID, connectorPair, last);
}
Expand Down Expand Up @@ -350,17 +370,17 @@ public void addClusterTopologyListener(final ClusterTopologyListener listener, f
topology.addClusterTopologyListener(listener);

// We now need to send the current topology to the client
executor.execute(new Runnable(){
executor.execute(new Runnable()
{
public void run()
{
topology.sendTopology(listener);

}
});
}

public void removeClusterTopologyListener(final ClusterTopologyListener listener,
final boolean clusterConnection)
public void removeClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
{
topology.removeClusterTopologyListener(listener);
}
Expand All @@ -380,8 +400,9 @@ public synchronized void activate()
String nodeID = server.getNodeID().toString();

TopologyMember member = topology.getMember(nodeID);
//swap backup as live and send it to everybody
member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(member.getConnector().b, null));
// swap backup as live and send it to everybody
member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(member.getConnector().b,
null));
topology.addMember(nodeID, member, false);

if (backupServerLocator != null)
Expand Down Expand Up @@ -434,7 +455,7 @@ public synchronized void activate()
log.warn("unable to start bridge " + bridge.getName(), e);
}
}

topology.sendMemberToListeners(nodeID, member);
}
}
Expand All @@ -460,7 +481,7 @@ public void announceBackup() throws Exception
log.warn("no cluster connections defined, unable to announce backup");
}
}

void addClusterLocator(final ServerLocatorInternal serverLocator)
{
this.clusterLocators.add(serverLocator);
Expand Down Expand Up @@ -681,7 +702,7 @@ public synchronized void deployBridge(final BridgeConfiguration config) throws E
}

serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize());

// We are going to manually retry on the bridge in case of failure
serverLocator.setReconnectAttempts(0);
serverLocator.setInitialConnectAttempts(-1);
Expand All @@ -693,12 +714,12 @@ public synchronized void deployBridge(final BridgeConfiguration config) throws E
serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
if (!config.isUseDuplicateDetection())
{
log.debug("Bridge " + config.getName() +
log.debug("Bridge " + config.getName() +
" is configured to not use duplicate detecion, it will send messages synchronously");
}

clusterLocators.add(serverLocator);

Bridge bridge = new BridgeImpl(serverLocator,
config.getReconnectAttempts(),
config.getRetryInterval(),
Expand Down Expand Up @@ -731,7 +752,7 @@ public synchronized void deployBridge(final BridgeConfiguration config) throws E
public void destroyBridge(final String name) throws Exception
{
Bridge bridge;

synchronized (this)
{
bridge = bridges.remove(name);
Expand All @@ -741,7 +762,7 @@ public void destroyBridge(final String name) throws Exception
managementService.unregisterBridge(name);
}
}

bridge.flushExecutor();
}

Expand Down Expand Up @@ -790,10 +811,13 @@ private synchronized void deployClusterConnection(final ClusterConnectionConfigu
"'. The cluster connection will not be deployed.");
return;
}

if (log.isDebugEnabled())
{
log.debug(this + " Starting a Discovery Group Cluster Connection, name=" + config.getDiscoveryGroupName() + ", dg=" + dg);
log.debug(this + " Starting a Discovery Group Cluster Connection, name=" +
config.getDiscoveryGroupName() +
", dg=" +
dg);
}

clusterConnection = new ClusterConnectionImpl(this,
Expand Down Expand Up @@ -828,7 +852,7 @@ private synchronized void deployClusterConnection(final ClusterConnectionConfigu
{
TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null ? connectorNameListToArray(config.getStaticConnectors())
: null;

if (log.isDebugEnabled())
{
log.debug(this + " defining cluster connection towards " + Arrays.toString(tcConfigs));
Expand Down Expand Up @@ -869,7 +893,7 @@ private synchronized void deployClusterConnection(final ClusterConnectionConfigu

if (log.isDebugEnabled())
{
log.debug("ClusterConnection.start at " + clusterConnection, new Exception ("trace"));
log.debug("ClusterConnection.start at " + clusterConnection, new Exception("trace"));
}
clusterConnection.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2059,6 +2059,7 @@ protected void stopServers(final int... nodes) throws Exception
{
ClusterTestBase.log.info("stopping server " + node);
servers[node].stop();
Thread.sleep(500);
ClusterTestBase.log.info("server " + node + " stopped");
}
catch (Exception e)
Expand Down

0 comments on commit e53bacd

Please sign in to comment.