Skip to content

Commit

Permalink
0001139: Expose information about the processes that are currently ru…
Browse files Browse the repository at this point in the history
…nning via an api to be used to inspect what is going on in an engine
  • Loading branch information
chenson42 committed Mar 30, 2013
1 parent c244b8a commit 13ba92e
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 18 deletions.
Expand Up @@ -21,6 +21,9 @@
package org.jumpmind.symmetric.model;

import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Date;

import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
Expand All @@ -30,7 +33,33 @@ public class ProcessInfo implements Serializable, Comparable<ProcessInfo> {
private static final long serialVersionUID = 1L;

public static enum Status {
NEW, QUERYING, EXTRACTING, LOADING, TRANSFERRING, ACKING, PROCESSING, DONE, ERROR
NEW, QUERYING, EXTRACTING, LOADING, TRANSFERRING, ACKING, PROCESSING, DONE, ERROR;

public String toString() {
switch (this) {
case NEW:
return "New";
case QUERYING:
return "Querying";
case EXTRACTING:
return "Extracting";
case LOADING:
return "Loading";
case TRANSFERRING:
return "Transferring";
case ACKING:
return "Acking";
case PROCESSING:
return "Processing";
case DONE:
return "Done";
case ERROR:
return "Error";

default:
return name();
}
}
};

private ProcessInfoKey key;
Expand All @@ -56,7 +85,7 @@ public static enum Status {
private Date endTime;

public ProcessInfo() {
this(new ProcessInfoKey("", "", ProcessInfoKey.ProcessType.TEST));
this(new ProcessInfoKey("", "", null));
}

public ProcessInfo(ProcessInfoKey key) {
Expand Down Expand Up @@ -189,4 +218,57 @@ public int compareTo(ProcessInfo o) {
return startTime.compareTo(o.startTime);
}
}

public ThreadData getThreadData() {
if (thread != null && thread.isAlive()) {
return getThreadData(thread.getId());
} else {
return null;
}
}

public static ThreadData getThreadData(long threadId) {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo info = threadBean.getThreadInfo(threadId, 100);
if (info != null) {
String threadName = info.getThreadName();
StringBuilder formattedTrace = new StringBuilder();
StackTraceElement[] trace = info.getStackTrace();
for (StackTraceElement stackTraceElement : trace) {
formattedTrace.append(stackTraceElement.getClassName());
formattedTrace.append(".");
formattedTrace.append(stackTraceElement.getMethodName());
formattedTrace.append("()");
int lineNumber = stackTraceElement.getLineNumber();
if (lineNumber > 0) {
formattedTrace.append(": ");
formattedTrace.append(stackTraceElement.getLineNumber());
}
formattedTrace.append("\n");
}

return new ThreadData(threadName, formattedTrace.toString());
} else {
return null;
}
}

static public class ThreadData {

public ThreadData(String threadName, String stackTrace) {
this.threadName = threadName;
this.stackTrace = stackTrace;
}

private String threadName;
private String stackTrace;

public String getStackTrace() {
return stackTrace;
}

public String getThreadName() {
return threadName;
}
}
}
Expand Up @@ -27,7 +27,30 @@ public class ProcessInfoKey implements Serializable {
private static final long serialVersionUID = 1L;

public enum ProcessType {
MANUAL_LOAD, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, ROUTER_JOB, GAP_DETECT, ROUTER_READER, OUTGOING_PURGE_JOB, INCOMING_PURGE_JOB, TEST
PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, ROUTER_JOB, GAP_DETECT, ROUTER_READER, MANUAL_LOAD;

public String toString() {
switch (this) {
case MANUAL_LOAD:
return "Manual Load";
case PUSH_JOB:
return "Push";
case PULL_JOB:
return "Pull";
case PUSH_HANDLER:
return "Service Push";
case PULL_HANDLER:
return "Service Pull";
case ROUTER_JOB:
return "Routing";
case ROUTER_READER:
return "Routing Reader";
case GAP_DETECT:
return "Gap Detection";
default:
return name();
}
}
};

private String sourceNodeId;
Expand Down
Expand Up @@ -41,6 +41,8 @@
import org.jumpmind.symmetric.model.TableReloadRequestKey;

public class ConfigurationChangedDataRouter extends AbstractDataRouter implements IDataRouter {

public static final String ROUTER_TYPE = "configurationChanged";

final String CTX_KEY_TABLE_RELOAD_NEEDED = "Reload.Table."
+ ConfigurationChangedDataRouter.class.getSimpleName() + hashCode();
Expand Down
Expand Up @@ -143,7 +143,7 @@ protected void execute() {
dataWithSameTransactionIdCount++;
datas.remove();
copyToQueue(data);
dataCount++;
dataCount++;
processInfo.incrementDataCount();
processInfo.setCurrentTableName(data.getTableName());
}
Expand Down
Expand Up @@ -217,15 +217,21 @@ private void pushToNode(Node remote, RemoteNodeStatus status) {
List<BatchAck> batches = transportManager.readAcknowledgement(ackString,
ackExtendedString);

long batchIdInError = Long.MAX_VALUE;
for (BatchAck batchInfo : batches) {
batchIds.remove(batchInfo.getBatchId());
if (!batchInfo.isOk()) {
batchIdInError = batchInfo.getBatchId();
}
log.debug("Saving ack: {}, {}", batchInfo.getBatchId(),
(batchInfo.isOk() ? "OK" : "error"));
(batchInfo.isOk() ? "OK" : "ER"));
acknowledgeService.ack(batchInfo);
}

for (Long batchId : batchIds) {
log.error("We expected but did not receive an ack for batch {}", batchId);
if (batchId < batchIdInError) {
log.error("We expected but did not receive an ack for batch {}", batchId);
}
}

status.updateOutgoingStatus(extractedBatches, batches);
Expand Down
Expand Up @@ -100,7 +100,7 @@ public RouterService(ISymmetricEngine engine) {
this.batchAlgorithms.put("transactional", new TransactionalBatchAlgorithm());

this.routers = new HashMap<String, IDataRouter>();
this.routers.put("configurationChanged", new ConfigurationChangedDataRouter(engine));
this.routers.put(ConfigurationChangedDataRouter.ROUTER_TYPE, new ConfigurationChangedDataRouter(engine));
this.routers.put("bsh", new BshDataRouter(engine));
this.routers.put("subselect", new SubSelectDataRouter(symmetricDialect));
this.routers.put("lookuptable", new LookupTableDataRouter(symmetricDialect));
Expand Down Expand Up @@ -517,6 +517,7 @@ protected int selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext c
data = nextData;
nextData = reader.take();
if (data != null) {
processInfo.setCurrentTableName(data.getTableName());
processInfo.incrementDataCount();
boolean atTransactionBoundary = false;
if (nextData != null) {
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerReBuildReason;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.route.ConfigurationChangedDataRouter;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.jumpmind.symmetric.service.IClusterService;
import org.jumpmind.symmetric.service.IConfigurationService;
Expand Down Expand Up @@ -391,7 +392,7 @@ protected TriggerRouter buildTriggerRoutersForSymmetricTables(String version, Tr
triggerRouter.setTrigger(trigger);

Router router = triggerRouter.getRouter();
router.setRouterType("configurationChanged");
router.setRouterType(ConfigurationChangedDataRouter.ROUTER_TYPE);
router.setNodeGroupLink(nodeGroupLink);
router.setLastUpdateTime(trigger.getLastUpdateTime());

Expand Down Expand Up @@ -512,7 +513,8 @@ public TriggerRouter getTriggerRouterForCurrentNode(String triggerId, String rou
List<TriggerRouter> triggerRouters = getTriggerRoutersForCurrentNode(refreshCache).get(triggerId);
if (triggerRouters != null) {
for (TriggerRouter testTriggerRouter : triggerRouters) {
if (testTriggerRouter.getRouter().getRouterId().equals(routerId)
if (ConfigurationChangedDataRouter.ROUTER_TYPE.equals(testTriggerRouter.getRouter().getRouterType()) ||
testTriggerRouter.getRouter().getRouterId().equals(routerId)
|| routerId.equals(Constants.UNKNOWN_ROUTER_ID)) {
triggerRouter = testTriggerRouter;
break;
Expand Down Expand Up @@ -993,15 +995,23 @@ protected void inactivateTriggers(List<Trigger> triggersThatShouldBeActive,

protected void dropTriggers(TriggerHistory history, StringBuilder sqlBuffer) {

symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(),
history.getSourceSchemaName(), history.getNameForInsertTrigger(),
history.getSourceTableName(), history);
symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(),
history.getSourceSchemaName(), history.getNameForDeleteTrigger(),
history.getSourceTableName(), history);
symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(),
history.getSourceSchemaName(), history.getNameForUpdateTrigger(),
history.getSourceTableName(), history);
if (StringUtils.isNotBlank(history.getNameForInsertTrigger())) {
symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(),
history.getSourceSchemaName(), history.getNameForInsertTrigger(),
history.getSourceTableName(), history);
}

if (StringUtils.isNotBlank(history.getNameForDeleteTrigger())) {
symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(),
history.getSourceSchemaName(), history.getNameForDeleteTrigger(),
history.getSourceTableName(), history);
}

if (StringUtils.isNotBlank(history.getNameForUpdateTrigger())) {
symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(),
history.getSourceSchemaName(), history.getNameForUpdateTrigger(),
history.getSourceTableName(), history);
}

if (parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
if (this.triggerCreationListeners != null) {
Expand Down

0 comments on commit 13ba92e

Please sign in to comment.