Skip to content

Commit

Permalink
Refactor of the Routing Strategy.
Browse files Browse the repository at this point in the history
This refactor includes:
1) Moving the relationship between the MpCluster manager and the RoutingStrategy down into the RoutingStrategy rather than having the Router be an intermediary.
2) More robust handling of issues with communicating with the MpCluster manager.
3) Test log tuning
  • Loading branch information
Jim Carroll committed May 15, 2012
1 parent 6ea8c23 commit 963c432
Show file tree
Hide file tree
Showing 14 changed files with 627 additions and 534 deletions.
Expand Up @@ -120,6 +120,17 @@ public ApplicationDefinition setClusterDefinitions(List<ClusterDefinition> clust
*/
public List<ClusterDefinition> getClusterDefinitions() { return Collections.unmodifiableList(clusterDefinitions); }

/**
* Get the {@link ClusterDefinition} that corresponds to the given clusterId.
*/
public ClusterDefinition getClusterDefinition(ClusterId clusterId)
{
for (ClusterDefinition cur : clusterDefinitions)
if (cur.getClusterId().equals(clusterId))
return cur;
return null;
}

/**
* When configuring by hand, this method
* @param clusterDefinitions is the {@link ClusterDefinition}s that make
Expand Down
Expand Up @@ -197,6 +197,7 @@ public void validate() throws DempsyException
if (messageProcessorPrototype != null && adaptor != null)
throw new DempsyException("A dempsy cluster must contain either an 'adaptor' or a message processor prototype but not both. " +
clusterId + " appears to be configured with both.");


if (messageProcessorPrototype != null)
{
Expand Down
Expand Up @@ -74,7 +74,7 @@ public interface MpCluster<T, N>
* Every MpCluster instance participating in a cluster will have the
* same cluster Id, which identifies the total set of Mps of the same prototype.
*/
public ClusterId getClusterId() throws MpClusterException;
public ClusterId getClusterId();

/**
* Sets the cluster level data.
Expand Down
Expand Up @@ -16,15 +16,15 @@

package com.nokia.dempsy.router;

import java.util.List;
import java.util.Collection;

import com.nokia.dempsy.DempsyException;
import com.nokia.dempsy.annotations.MessageKey;
import com.nokia.dempsy.config.ApplicationDefinition;
import com.nokia.dempsy.config.ClusterDefinition;
import com.nokia.dempsy.config.ClusterId;
import com.nokia.dempsy.messagetransport.Destination;
import com.nokia.dempsy.mpcluster.MpCluster;
import com.nokia.dempsy.mpcluster.MpClusterException;

/**
* <p>A {@link RoutingStrategy} is responsible for determining how to find the appropriate
Expand Down Expand Up @@ -66,35 +66,76 @@ public interface RoutingStrategy
{
public static interface Outbound
{
public SlotInformation selectSlotForMessageKey(Object messageKey) throws DempsyException;

/**
* resetCluster is called when the cluster for the Outbound side changes. In this
* way implementations of this class do not need to be MpClusterWatchers
* @param cluster - the cluster handle containing the new state.
* @throws MpClusterException when the implementation has a problem accessing the cluster
* This method needs to be implemented to determine the specific node that the outgoing
* message is to be routed to.
*
* @param messageKey is the message key for the message to be routed
* @param message is the message to be routed.
* @return a transport Destination indicating the unique node in the downstream cluster
* that the message should go to.
* @throws DempsyException when something distasteful happens.
*/
public void resetCluster(MpCluster<ClusterInformation, SlotInformation> cluster) throws MpClusterException;
}

public static interface Inbound
{
public Destination selectDestinationForMessage(Object messageKey, Object message) throws DempsyException;

/**
* <p>resetCluster is called when the cluster for the Inbound side changes. In this
* way implementations of this class do not need to be MpClusterWatchers.</p>
* The {@link Outbound} is responsible for providing the {@link ClusterId} for which it is the
* {@link Outbound} for.
*/
public ClusterId getClusterId();

/**
* <p>Each node can have many Outbound instances and those Outbound cluster references
* can come and go. In order to tell Dempsy about what's going on in the cluster
* the Outbound should be updating the state of the OutboundCoordinator.</p>
*
* @param cluster - the cluster handle containing the new state.
* @throws MpClusterException when the implementation has a problem accessing the cluster
* <p>Implementors of the RoutingStrategy do not need to implement this interface.
* There is only one implementation and that instance will be supplied by the
* framework.</p>
*/
public void resetCluster(MpCluster<ClusterInformation, SlotInformation> cluster,
List<Class<?>> messageTypes, Destination thisDestination) throws MpClusterException;
public static interface Coordinator
{
/**
* registers the outbound with the Coordinator and provide what types the destination
* cluster can handle. Note that the Outbound is allowed to call registerOutbound
* more than once, without calling unregisterOutbound first but the results should
* be the same.
*/
public void registerOutbound(Outbound outbound, Collection<Class<?>> classes);

/**
* registers the outbound with the Coordinator and provide what types the destination
* cluster can handle.
*/
public void unregisterOutbound(Outbound outbound);

}

public void stop();

}

public static interface Inbound
{
public boolean doesMessageKeyBelongToNode(Object messageKey);

public void stop();
}

public Inbound createInbound();
public Inbound createInbound(MpCluster<ClusterInformation, SlotInformation> cluster, Collection<Class<?>> messageTypes, Destination thisDestination);

public Outbound createOutbound();
/**
* The RoutingStrategy needs to create an {@link Outbound} that corresponds to the given cluster. It should do this
* in a manner that absolutely succeeds even if the cluster information manager is in a bad state. This is why
* this method takes stable parameters and throws no exception.
*
* @param coordinator is the coordinator that the newly created {@link Outbound} can use to call back on the
* framework.
* @param clusterId is the cluster id that the {@link Outbound} is being created for.
* @return a new {@link Outbound} that manages the selection of a {@link Destination} given a message destined for
* the given cluster.
*/
public Outbound createOutbound(Outbound.Coordinator coordinator, MpCluster<ClusterInformation, SlotInformation> cluster);

}

119 changes: 49 additions & 70 deletions lib-dempsyimpl/src/main/java/com/nokia/dempsy/Dempsy.java
Expand Up @@ -34,6 +34,7 @@
import com.nokia.dempsy.container.ContainerException;
import com.nokia.dempsy.container.MpContainer;
import com.nokia.dempsy.internal.util.SafeString;
import com.nokia.dempsy.messagetransport.Destination;
import com.nokia.dempsy.messagetransport.Receiver;
import com.nokia.dempsy.messagetransport.Transport;
import com.nokia.dempsy.monitoring.StatsCollector;
Expand All @@ -42,7 +43,6 @@
import com.nokia.dempsy.mpcluster.MpClusterException;
import com.nokia.dempsy.mpcluster.MpClusterSession;
import com.nokia.dempsy.mpcluster.MpClusterSessionFactory;
import com.nokia.dempsy.mpcluster.MpClusterWatcher;
import com.nokia.dempsy.output.OutputExecuter;
import com.nokia.dempsy.router.ClusterInformation;
import com.nokia.dempsy.router.CurrentClusterCheck;
Expand Down Expand Up @@ -78,7 +78,7 @@ public class Cluster
* Currently a Node is instantiated within the Dempsy orchestrator as a one to one with the
* {@link Cluster}.
*/
public class Node implements MpClusterWatcher
public class Node
{
protected ClusterDefinition clusterDefinition;

Expand Down Expand Up @@ -119,37 +119,34 @@ private void start() throws DempsyException
container.setSerializer(serializer);

// there is only a reciever if we have an Mp (that is, we aren't an adaptor) and start accepting messages
Destination thisDestination = null;
if (messageProcessorPrototype != null && acceptedMessageClasses != null && acceptedMessageClasses.size() > 0)
{
receiver = transport.createInbound();
receiver.setListener(container);
thisDestination = receiver.getDestination();
}

StatsCollectorFactory statsFactory = (StatsCollectorFactory)clusterDefinition.getStatsCollectorFactory();
if (statsFactory != null)
{
statsCollector = statsFactory.createStatsCollector(currentClusterId,
receiver != null ? receiver.getDestination() : null);
receiver != null ? thisDestination : null);
router.setStatsCollector(statsCollector);
container.setStatCollector(statsCollector);
}

RoutingStrategy strategy = (RoutingStrategy)clusterDefinition.getRoutingStrategy();

currentClusterHandle = clusterSession.getCluster(currentClusterId);

// there is only an inbound strategy if we have an Mp (that is, we aren't an adaptor) and
// we actually accept messages
if (messageProcessorPrototype != null && acceptedMessageClasses != null && acceptedMessageClasses.size() > 0)
strategyInbound = strategy.createInbound();

currentClusterHandle = clusterSession.getCluster(currentClusterId);
strategyInbound = strategy.createInbound(currentClusterHandle,acceptedMessageClasses, thisDestination);

// this can fail because of down cluster manager server ... but it should eventually recover.
try
{
if (strategyInbound != null && receiver != null)
strategyInbound.resetCluster(currentClusterHandle, acceptedMessageClasses, receiver.getDestination());
router.initialize();
}
try { router.initialize(); }
catch (MpClusterException e)
{
logger.warn("Strategy failed to initialize. Continuing anyway. The cluster manager issue will be resolved automatically.",e);
Expand Down Expand Up @@ -206,22 +203,6 @@ public void run()
}, "Pre-Instantation Thread");
t.start();
}

// now we want to set the Node as the watcher.
if (strategyInbound != null && receiver != null)
{
currentClusterHandle.addWatcher(this);

// and reset just in case something happened
try
{
strategyInbound.resetCluster(currentClusterHandle, acceptedMessageClasses, receiver.getDestination());
}
catch (MpClusterException e)
{
logger.warn("Strategy failed to initialize. Continuing anyway. The cluster manager issue will be resolved automatically.",e);
}
}
}
catch(RuntimeException e) { throw e; }
catch(Exception e) { throw new DempsyException(e); }
Expand All @@ -231,20 +212,6 @@ public void run()

public MpContainer getMpContainer() { return container; }

@Override
public void process()
{
try
{
if (strategyInbound != null)
strategyInbound.resetCluster(currentClusterHandle, acceptedMessageClasses, receiver.getDestination());
}
// TODO: fix these catches... .need to take note of a failure for a later retry
// using a scheduled task.
catch(RuntimeException e) { throw e; }
catch(Exception e) { throw new RuntimeException(e); }
}

public void stop()
{
if (receiver != null)
Expand All @@ -268,6 +235,8 @@ public void stop()
if (statsCollector != null)
try { statsCollector.stop(); statsCollector = null;} catch (Throwable th) { logger.error("Problem shutting down node for " + SafeString.valueOf(clusterDefinition), th); }

if (strategyInbound != null)
try { strategyInbound.stop(); strategyInbound = null;} catch (Throwable th) { logger.error("Problem shutting down node for " + SafeString.valueOf(clusterDefinition), th); }
}

/**
Expand Down Expand Up @@ -520,39 +489,49 @@ public synchronized void start() throws DempsyException

if (defaultStatsCollectorFactory == null)
throw new DempsyException("Cannot start this application because there's no default stats collector factory defined.");

applications = new ArrayList<Application>(applicationDefinitions.size());
for(ApplicationDefinition appDef: this.applicationDefinitions)

try
{
appDef.initialize();
if (clusterCheck.isThisNodePartOfApplication(appDef.getApplicationName()))
applications = new ArrayList<Application>(applicationDefinitions.size());
for(ApplicationDefinition appDef: this.applicationDefinitions)
{
Application app = new Application(appDef);

// set the default routing strategy if there isn't one already set.
if (appDef.getRoutingStrategy() == null)
appDef.setRoutingStrategy(defaultRoutingStrategy);

if (appDef.getSerializer() == null)
appDef.setSerializer(defaultSerializer);

if (appDef.getStatsCollectorFactory() == null)
appDef.setStatsCollectorFactory(defaultStatsCollectorFactory);

applications.add(app);
appDef.initialize();
if (clusterCheck.isThisNodePartOfApplication(appDef.getApplicationName()))
{
Application app = new Application(appDef);

// set the default routing strategy if there isn't one already set.
if (appDef.getRoutingStrategy() == null)
appDef.setRoutingStrategy(defaultRoutingStrategy);

if (appDef.getSerializer() == null)
appDef.setSerializer(defaultSerializer);

if (appDef.getStatsCollectorFactory() == null)
appDef.setStatsCollectorFactory(defaultStatsCollectorFactory);

applications.add(app);
}
}

boolean clusterStarted = false;
for (Application app : applications)
clusterStarted = app.start();

if(!clusterStarted)
{
throw new DempsyException("Cannot start this application because cluster defination was not found.");
}
// if we got to here we can assume we're started
synchronized(isRunningEvent) { isRunning = true; }
}

boolean clusterStarted = false;
for (Application app : applications)
clusterStarted = app.start();

if(!clusterStarted)
catch (RuntimeException rte)
{
throw new DempsyException("Cannot start this application because cluster defination was not found.");
logger.error("Failed to start Dempsy. Attempting to stop.");
// if something unpexpected happened then we should attempt to stop
try { stop(); } catch (Throwable th) {}
throw rte;
}
// if we got to here we can assume we're started
synchronized(isRunningEvent) { isRunning = true; }
}

public synchronized void stop()
Expand Down Expand Up @@ -634,7 +613,7 @@ public boolean waitToBeStopped(long timeInMillis) throws InterruptedException
}
}

private static List<Class<?>> getAcceptedMessages(ClusterDefinition clusterDef)
protected static List<Class<?>> getAcceptedMessages(ClusterDefinition clusterDef)
{
List<Class<?>> messageClasses = new ArrayList<Class<?>>();
Object prototype = clusterDef.getMessageProcessorPrototype();
Expand Down
Expand Up @@ -45,6 +45,8 @@ public String toString()
@Override
public boolean equals(Object other)
{
if (other == null)
return false;
TcpDestination otherTcpDestination = (TcpDestination)other;
return inetAddress.equals(otherTcpDestination.inetAddress ) && (port == otherTcpDestination.port);
}
Expand Down

0 comments on commit 963c432

Please sign in to comment.