diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java index e2693c4bd2..9b26f239eb 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java @@ -21,6 +21,9 @@ package org.jumpmind.symmetric.model; import java.io.Serializable; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.util.Date; import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; @@ -30,7 +33,33 @@ public class ProcessInfo implements Serializable, Comparable { private static final long serialVersionUID = 1L; public static enum Status { - NEW, QUERYING, EXTRACTING, LOADING, TRANSFERRING, ACKING, PROCESSING, DONE, ERROR + NEW, QUERYING, EXTRACTING, LOADING, TRANSFERRING, ACKING, PROCESSING, DONE, ERROR; + + public String toString() { + switch (this) { + case NEW: + return "New"; + case QUERYING: + return "Querying"; + case EXTRACTING: + return "Extracting"; + case LOADING: + return "Loading"; + case TRANSFERRING: + return "Transferring"; + case ACKING: + return "Acking"; + case PROCESSING: + return "Processing"; + case DONE: + return "Done"; + case ERROR: + return "Error"; + + default: + return name(); + } + } }; private ProcessInfoKey key; @@ -56,7 +85,7 @@ public static enum Status { private Date endTime; public ProcessInfo() { - this(new ProcessInfoKey("", "", ProcessInfoKey.ProcessType.TEST)); + this(new ProcessInfoKey("", "", null)); } public ProcessInfo(ProcessInfoKey key) { @@ -189,4 +218,57 @@ public int compareTo(ProcessInfo o) { return startTime.compareTo(o.startTime); } } + + public ThreadData getThreadData() { + if (thread != null && thread.isAlive()) { + return getThreadData(thread.getId()); + } else { + return null; + } + } + + public static ThreadData getThreadData(long threadId) { + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + ThreadInfo info = threadBean.getThreadInfo(threadId, 100); + if (info != null) { + String threadName = info.getThreadName(); + StringBuilder formattedTrace = new StringBuilder(); + StackTraceElement[] trace = info.getStackTrace(); + for (StackTraceElement stackTraceElement : trace) { + formattedTrace.append(stackTraceElement.getClassName()); + formattedTrace.append("."); + formattedTrace.append(stackTraceElement.getMethodName()); + formattedTrace.append("()"); + int lineNumber = stackTraceElement.getLineNumber(); + if (lineNumber > 0) { + formattedTrace.append(": "); + formattedTrace.append(stackTraceElement.getLineNumber()); + } + formattedTrace.append("\n"); + } + + return new ThreadData(threadName, formattedTrace.toString()); + } else { + return null; + } + } + + static public class ThreadData { + + public ThreadData(String threadName, String stackTrace) { + this.threadName = threadName; + this.stackTrace = stackTrace; + } + + private String threadName; + private String stackTrace; + + public String getStackTrace() { + return stackTrace; + } + + public String getThreadName() { + return threadName; + } + } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java index f403ac2ea1..e7c1b662ff 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfoKey.java @@ -27,7 +27,30 @@ public class ProcessInfoKey implements Serializable { private static final long serialVersionUID = 1L; public enum ProcessType { - MANUAL_LOAD, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, ROUTER_JOB, GAP_DETECT, ROUTER_READER, OUTGOING_PURGE_JOB, INCOMING_PURGE_JOB, TEST + PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, ROUTER_JOB, GAP_DETECT, ROUTER_READER, MANUAL_LOAD; + + public String toString() { + switch (this) { + case MANUAL_LOAD: + return "Manual Load"; + case PUSH_JOB: + return "Push"; + case PULL_JOB: + return "Pull"; + case PUSH_HANDLER: + return "Service Push"; + case PULL_HANDLER: + return "Service Pull"; + case ROUTER_JOB: + return "Routing"; + case ROUTER_READER: + return "Routing Reader"; + case GAP_DETECT: + return "Gap Detection"; + default: + return name(); + } + } }; private String sourceNodeId; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java index b7bef3ce51..9441f0f9be 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java @@ -41,6 +41,8 @@ import org.jumpmind.symmetric.model.TableReloadRequestKey; public class ConfigurationChangedDataRouter extends AbstractDataRouter implements IDataRouter { + + public static final String ROUTER_TYPE = "configurationChanged"; final String CTX_KEY_TABLE_RELOAD_NEEDED = "Reload.Table." + ConfigurationChangedDataRouter.class.getSimpleName() + hashCode(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java index d131ee8037..d05ccf5770 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java @@ -143,7 +143,7 @@ protected void execute() { dataWithSameTransactionIdCount++; datas.remove(); copyToQueue(data); - dataCount++; + dataCount++; processInfo.incrementDataCount(); processInfo.setCurrentTableName(data.getTableName()); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java index 91a1c1fc10..36739933af 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java @@ -217,15 +217,21 @@ private void pushToNode(Node remote, RemoteNodeStatus status) { List batches = transportManager.readAcknowledgement(ackString, ackExtendedString); + long batchIdInError = Long.MAX_VALUE; for (BatchAck batchInfo : batches) { batchIds.remove(batchInfo.getBatchId()); + if (!batchInfo.isOk()) { + batchIdInError = batchInfo.getBatchId(); + } log.debug("Saving ack: {}, {}", batchInfo.getBatchId(), - (batchInfo.isOk() ? "OK" : "error")); + (batchInfo.isOk() ? "OK" : "ER")); acknowledgeService.ack(batchInfo); } for (Long batchId : batchIds) { - log.error("We expected but did not receive an ack for batch {}", batchId); + if (batchId < batchIdInError) { + log.error("We expected but did not receive an ack for batch {}", batchId); + } } status.updateOutgoingStatus(extractedBatches, batches); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index f9e7ec76ae..b5704d96fd 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -100,7 +100,7 @@ public RouterService(ISymmetricEngine engine) { this.batchAlgorithms.put("transactional", new TransactionalBatchAlgorithm()); this.routers = new HashMap(); - this.routers.put("configurationChanged", new ConfigurationChangedDataRouter(engine)); + this.routers.put(ConfigurationChangedDataRouter.ROUTER_TYPE, new ConfigurationChangedDataRouter(engine)); this.routers.put("bsh", new BshDataRouter(engine)); this.routers.put("subselect", new SubSelectDataRouter(symmetricDialect)); this.routers.put("lookuptable", new LookupTableDataRouter(symmetricDialect)); @@ -517,6 +517,7 @@ protected int selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext c data = nextData; nextData = reader.take(); if (data != null) { + processInfo.setCurrentTableName(data.getTableName()); processInfo.incrementDataCount(); boolean atTransactionBoundary = false; if (nextData != null) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java index ce1b986398..2a207e01ca 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java @@ -51,6 +51,7 @@ import org.jumpmind.symmetric.model.TriggerHistory; import org.jumpmind.symmetric.model.TriggerReBuildReason; import org.jumpmind.symmetric.model.TriggerRouter; +import org.jumpmind.symmetric.route.ConfigurationChangedDataRouter; import org.jumpmind.symmetric.service.ClusterConstants; import org.jumpmind.symmetric.service.IClusterService; import org.jumpmind.symmetric.service.IConfigurationService; @@ -391,7 +392,7 @@ protected TriggerRouter buildTriggerRoutersForSymmetricTables(String version, Tr triggerRouter.setTrigger(trigger); Router router = triggerRouter.getRouter(); - router.setRouterType("configurationChanged"); + router.setRouterType(ConfigurationChangedDataRouter.ROUTER_TYPE); router.setNodeGroupLink(nodeGroupLink); router.setLastUpdateTime(trigger.getLastUpdateTime()); @@ -512,7 +513,8 @@ public TriggerRouter getTriggerRouterForCurrentNode(String triggerId, String rou List triggerRouters = getTriggerRoutersForCurrentNode(refreshCache).get(triggerId); if (triggerRouters != null) { for (TriggerRouter testTriggerRouter : triggerRouters) { - if (testTriggerRouter.getRouter().getRouterId().equals(routerId) + if (ConfigurationChangedDataRouter.ROUTER_TYPE.equals(testTriggerRouter.getRouter().getRouterType()) || + testTriggerRouter.getRouter().getRouterId().equals(routerId) || routerId.equals(Constants.UNKNOWN_ROUTER_ID)) { triggerRouter = testTriggerRouter; break; @@ -993,15 +995,23 @@ protected void inactivateTriggers(List triggersThatShouldBeActive, protected void dropTriggers(TriggerHistory history, StringBuilder sqlBuffer) { - symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), - history.getSourceSchemaName(), history.getNameForInsertTrigger(), - history.getSourceTableName(), history); - symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), - history.getSourceSchemaName(), history.getNameForDeleteTrigger(), - history.getSourceTableName(), history); - symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), - history.getSourceSchemaName(), history.getNameForUpdateTrigger(), - history.getSourceTableName(), history); + if (StringUtils.isNotBlank(history.getNameForInsertTrigger())) { + symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), + history.getSourceSchemaName(), history.getNameForInsertTrigger(), + history.getSourceTableName(), history); + } + + if (StringUtils.isNotBlank(history.getNameForDeleteTrigger())) { + symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), + history.getSourceSchemaName(), history.getNameForDeleteTrigger(), + history.getSourceTableName(), history); + } + + if (StringUtils.isNotBlank(history.getNameForUpdateTrigger())) { + symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), + history.getSourceSchemaName(), history.getNameForUpdateTrigger(), + history.getSourceTableName(), history); + } if (parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) { if (this.triggerCreationListeners != null) {