Skip to content

Commit

Permalink
0001033: Before allocating push/pull jobs check to see if thread coun…
Browse files Browse the repository at this point in the history
…t has changed and if so, restart thread pool
  • Loading branch information
chenson42 committed Feb 4, 2013
1 parent baed5d1 commit e7649cc
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 30 deletions.
Expand Up @@ -62,9 +62,6 @@ public class ConfigurationChangedDataRouter extends AbstractDataRouter implement
final String CTX_KEY_RESTART_JOBMANAGER_NEEDED = "RestartJobManager."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

final String CTX_KEY_RESTART_NODE_COMMUNICATOR_NEEDED = "RestartNodeCommunicatorThreadPool."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();

public final static String KEY = "symconfig";

protected ISymmetricEngine engine;
Expand All @@ -80,7 +77,7 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
Set<Node> possibleTargetNodes, boolean initialLoad) {

// the list of nodeIds that we will return
Set<String> nodeIds = null;
Set<String> nodeIds = null;

// the inbound data
Map<String, String> columnValues = getDataMap(dataMetaData, engine != null ? engine.getSymmetricDialect() : null);
Expand Down Expand Up @@ -227,20 +224,13 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
routingContext.put(CTX_KEY_FLUSH_PARAMETERS_NEEDED,
Boolean.TRUE);

if (dataMetaData.getData().getRowData() != null
&& dataMetaData.getData().getRowData().contains("job.")) {
if (StringUtils.isBlank(dataMetaData.getData().getSourceNodeId()) &&
(dataMetaData.getData().getRowData() != null
&& dataMetaData.getData().getRowData().contains("job."))) {
routingContext.put(CTX_KEY_RESTART_JOBMANAGER_NEEDED,
Boolean.TRUE);
}

if (dataMetaData.getData().getRowData() != null
&& (dataMetaData.getData().getRowData()
.contains(ParameterConstants.PULL_THREAD_COUNT_PER_SERVER) || dataMetaData
.getData().getRowData()
.contains(ParameterConstants.PUSH_THREAD_COUNT_PER_SERVER))) {
routingContext.put(
CTX_KEY_RESTART_NODE_COMMUNICATOR_NEEDED, Boolean.TRUE);
}
}

if (tableMatches(dataMetaData, TableConstants.SYM_TRANSFORM_COLUMN)
Expand Down Expand Up @@ -366,11 +356,6 @@ public void contextCommitted(SimpleRouterContext routingContext) {
log.info("About to refresh the cache of conflict settings because new configuration came through the data router");
engine.getDataLoaderService().reloadConflictNodeGroupLinks();
}

if (routingContext.get(CTX_KEY_RESTART_NODE_COMMUNICATOR_NEEDED) != null) {
log.info("About to reset the thread pools used to communicate with nodes because the thread pool definition changed");
engine.getNodeCommunicationService().stop();
}

insertReloadEvents(routingContext);

Expand Down
Expand Up @@ -147,21 +147,28 @@ public void save(NodeCommunication nodeCommunication) {

protected ThreadPoolExecutor getExecutor(final CommunicationType communicationType) {
ThreadPoolExecutor service = executors.get(communicationType);

String threadCountParameter = "";
switch (communicationType) {
case PULL:
threadCountParameter = ParameterConstants.PULL_THREAD_COUNT_PER_SERVER;
break;
case PUSH:
threadCountParameter = ParameterConstants.PUSH_THREAD_COUNT_PER_SERVER;
break;
}
int threadCount = parameterService.getInt(threadCountParameter, 1);

if (service != null && service.getCorePoolSize() != threadCount) {
log.info("{} has changed from {} to {}. Restarting thread pool", new Object[] { threadCountParameter, service.getCorePoolSize(), threadCount });
stop();
service = null;
}

if (service == null) {
synchronized (this) {
service = executors.get(communicationType);
if (service == null) {
String threadCountParameter = "";
switch (communicationType) {
case PULL:
threadCountParameter = ParameterConstants.PULL_THREAD_COUNT_PER_SERVER;
break;
case PUSH:
threadCountParameter = ParameterConstants.PUSH_THREAD_COUNT_PER_SERVER;
break;
}
int threadCount = parameterService.getInt(threadCountParameter, 1);

if (threadCount <= 0) {
log.warn("{}={} is not a legal value. Defaulting to 1",
threadCountParameter, threadCount);
Expand Down

0 comments on commit e7649cc

Please sign in to comment.