diff --git a/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidMonitorService.java b/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidMonitorService.java index 2226cddeaa..ca59d2ae6f 100644 --- a/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidMonitorService.java +++ b/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidMonitorService.java @@ -20,6 +20,7 @@ */ package org.jumpmind.symmetric.android; +import java.util.ArrayList; import java.util.List; import org.jumpmind.symmetric.ISymmetricEngine; @@ -137,4 +138,10 @@ public List getActiveMonitorsUnresolvedForNode(String nodeGroupId, Stri public List getActiveMonitorsUnresolvedForNodeFromDb(String nodeGroupId, String externalId) { return null; } + + @Override + public List getMonitorEventsByMonitorId(String monitorId) { + return new ArrayList(); + } + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushHeartbeatListener.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushHeartbeatListener.java index 2cc59e4d38..731e777ce8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushHeartbeatListener.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushHeartbeatListener.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.util.Date; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -39,8 +40,11 @@ import org.jumpmind.symmetric.ext.IHeartbeatListener; import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.service.ClusterConstants; import org.jumpmind.symmetric.service.IDataExtractorService; import org.jumpmind.symmetric.service.IParameterService; +import org.jumpmind.symmetric.service.IStatisticService; +import org.jumpmind.symmetric.statistic.IStatisticManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,9 +63,59 @@ public void heartbeat(Node me) { boolean updateWithBatchStatus = parameterService.is(ParameterConstants.HEARTBEAT_UPDATE_NODE_WITH_BATCH_STATUS, false); int outgoingErrorCount = -1; int outgoingUnsentCount = -1; + int outgoingUnsentRowCount = -1; + Date lastSuccessfulSyncTime = null; + String mostRecentActiveTableSynced = null; + int totalRowsLoaded = -1; + Date oldestLoadedDate = null; + long purgeOutgoingLastMs = -1; + Date purgeOutgoingLastRun = null; + long purgeOutgoingAverage = -1; + + long routingLastMs = -1; + long routingAveragetMs = -1; + Date routingLastRun = null; + long symDataSize = -1; + if (updateWithBatchStatus) { - outgoingUnsentCount = engine.getOutgoingBatchService().countOutgoingBatchesUnsent(); outgoingErrorCount = engine.getOutgoingBatchService().countOutgoingBatchesInError(); + int[] batchesRowsUnsent = engine.getOutgoingBatchService().countOutgoingNonSystemBatchesRowsUnsent(); + + outgoingUnsentCount = batchesRowsUnsent[0]; + outgoingUnsentRowCount = batchesRowsUnsent[1]; + + Date outDate = engine.getOutgoingBatchService().getOutgoingBatchesLatestUpdateSql(); + Date inDate = engine.getIncomingBatchService().getIncomingBatchesLatestUpdateSql(); + if (outDate == null && inDate == null) { + lastSuccessfulSyncTime = null; + } else if (outDate == null) { + lastSuccessfulSyncTime = inDate; + } else if (inDate == null) { + lastSuccessfulSyncTime = outDate; + } else { + lastSuccessfulSyncTime = outDate.after(inDate) ? outDate : inDate; + } + + IStatisticManager statisticsManager = engine.getStatisticManager(); + mostRecentActiveTableSynced = statisticsManager.getMostRecentActiveTableSynced(); + Map totalLoadedRowsMap = statisticsManager.getTotalLoadedRows(); + if (totalLoadedRowsMap != null && totalLoadedRowsMap.size() == 1) { + totalRowsLoaded = totalLoadedRowsMap.keySet().iterator().next(); + oldestLoadedDate = totalLoadedRowsMap.values().iterator().next(); + } + + IJob purgeOutgoingJob = engine.getJobManager().getJob(ClusterConstants.PURGE_OUTGOING); + purgeOutgoingLastMs = purgeOutgoingJob.getLastExecutionTimeInMs(); + purgeOutgoingLastRun = purgeOutgoingJob.getLastFinishTime(); + purgeOutgoingAverage = purgeOutgoingJob.getAverageExecutionTimeInMs(); + + IJob routeJob = engine.getJobManager().getJob(ClusterConstants.ROUTE); + routingAveragetMs = routeJob.getAverageExecutionTimeInMs(); + routingLastRun = routeJob.getLastFinishTime(); + routingLastMs = routeJob.getLastExecutionTimeInMs(); + + symDataSize = engine.getDataService().countData(); + } if (!parameterService.getExternalId().equals(me.getExternalId()) || !parameterService.getNodeGroupId().equals(me.getNodeGroupId()) @@ -73,7 +127,16 @@ public void heartbeat(Node me) { || !symmetricDialect.getName().equals(me.getDatabaseType()) || !symmetricDialect.getVersion().equals(me.getDatabaseVersion()) || me.getBatchInErrorCount() != outgoingErrorCount - || me.getBatchToSendCount() != outgoingUnsentCount) { + || me.getBatchToSendCount() != outgoingUnsentCount + || me.getLastSuccessfulSyncDate() != lastSuccessfulSyncTime + || me.getMostRecentActiveTableSynced() != mostRecentActiveTableSynced + || me.getPurgeOutgoingLastMs() != purgeOutgoingLastMs + || me.getPurgeOutgoingLastRun() != purgeOutgoingLastRun + || me.getPurgeOutgoingAverageMs() != purgeOutgoingAverage + || me.getRoutingAverageMs() != routingAveragetMs + || me.getRoutingLastRun() != routingLastRun + || me.getRoutingLastMs() != routingLastMs + || me.getSymDataSize() != symDataSize) { log.info("Some attribute(s) of node changed. Recording changes"); me.setDeploymentType(engine.getDeploymentType()); me.setDeploymentSubType(engine.getDeploymentSubType()); @@ -83,6 +146,19 @@ public void heartbeat(Node me) { me.setDatabaseName(engine.getDatabasePlatform().getName()); me.setBatchInErrorCount(outgoingErrorCount); me.setBatchToSendCount(outgoingUnsentCount); + me.setLastSuccessfulSyncDate(lastSuccessfulSyncTime); + me.setDataRowsToSendCount(outgoingUnsentRowCount); + me.setMostRecentActiveTableSynced(mostRecentActiveTableSynced); + me.setDataRowsLoadedCount(totalRowsLoaded); + me.setOldestLoadTime(oldestLoadedDate); + me.setPurgeOutgoingLastMs(purgeOutgoingLastMs); + me.setPurgeOutgoingLastRun(purgeOutgoingLastRun); + me.setPurgeOutgoingAverageMs(purgeOutgoingAverage); + me.setRoutingAverageMs(routingAveragetMs); + me.setRoutingLastRun(routingLastRun); + me.setRoutingLastMs(routingLastMs); + me.setSymDataSize(symDataSize); + me.setSchemaVersion(parameterService.getString(ParameterConstants.SCHEMA_VERSION)); if (engine.getParameterService().isRegistrationServer()) { me.setConfigVersion(Version.version()); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java index 42998147ab..60ae16ba15 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/ConfigurationChangedDatabaseWriterFilter.java @@ -104,6 +104,15 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) { } } } + } else if (matchesTable(table, TableConstants.SYM_MONITOR) && (data.getDataEventType() == DataEventType.INSERT)) { + Map newData = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA); + String monitorID = newData.get("MONITOR_ID"); + if (monitorID != null && (monitorID.equals("SystemBatchErrorMonitor") + || monitorID.equals("SystemLogMonitor") + || monitorID.equals("SystemOfflineNodeMonitor"))) { + return false; + } + } return true; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/MonitorEvent.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/MonitorEvent.java index 157da962c8..86a4809c5e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/MonitorEvent.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/MonitorEvent.java @@ -22,6 +22,14 @@ import java.util.Date; +import org.jumpmind.symmetric.monitor.MonitorTypeBatchError; +import org.jumpmind.symmetric.monitor.MonitorTypeBatchUnsent; +import org.jumpmind.symmetric.monitor.MonitorTypeCpu; +import org.jumpmind.symmetric.monitor.MonitorTypeDisk; +import org.jumpmind.symmetric.monitor.MonitorTypeMemory; +import org.jumpmind.symmetric.monitor.MonitorTypeOfflineNodes; +import org.jumpmind.symmetric.monitor.MonitorTypeUnrouted; + public class MonitorEvent { protected String monitorId; protected String nodeId; @@ -177,4 +185,26 @@ public String getDetails() { public void setDetails(String details) { this.details = details; } + + public String getPrettyPrint() { + StringBuffer sb = new StringBuffer(String.valueOf(value)); + if (MonitorTypeBatchError.NAME.equals(getType())) { + sb.append(getValue() > 1 ? " batches" : " batch"); + } else if (MonitorTypeBatchUnsent.NAME.equals(getType())) { + sb.append(getValue() > 1 ? " batches" : " batch"); + } else if (MonitorTypeUnrouted.NAME.equals(getType())) { + sb.append(getValue() > 1 ? " rows" : " row"); + } else if (MonitorTypeOfflineNodes.NAME.equals(getType())) { + sb.append(getValue() > 1 ? " nodes" : " node"); + } else if (MonitorTypeCpu.NAME.equals(getType())) { + sb.append("%"); + } else if (MonitorTypeMemory.NAME.equals(getType())) { + sb.append("%"); + } else if (MonitorTypeDisk.NAME.equals(getType())) { + sb.append("%"); + } + + return sb.toString(); + + } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java index 2f9d047ef8..69e477f844 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java @@ -24,6 +24,7 @@ import java.io.Serializable; import java.math.BigDecimal; +import java.util.Date; import java.util.Properties; import org.apache.commons.lang3.StringUtils; @@ -68,7 +69,19 @@ public class Node implements Serializable, Comparable { private String deploymentType; private String deploymentSubType; private int[] symmetricVersionParts; - + private Date lastSuccessfulSyncDate; + private String mostRecentActiveTableSynced; + private int dataRowsToSendCount; + private int dataRowsLoadedCount; + private Date oldestLoadTime; + private long purgeOutgoingLastMs; + private Date purgeOutgoingLastRun; + private long purgeOutgoingAverageMs; + private long routingAverageMs; + private Date routingLastRun; + private long routingLastMs; + private long symDataSize; + public Node() { } @@ -265,6 +278,95 @@ public String getDeploymentSubType() { public void setDeploymentSubType(String deploymentSubType) { this.deploymentSubType = deploymentSubType; } + + public Date getLastSuccessfulSyncDate() { + return lastSuccessfulSyncDate; + } + + public void setLastSuccessfulSyncDate(Date lastSuccessfulSyncDate) { + this.lastSuccessfulSyncDate = lastSuccessfulSyncDate; + } + + + public int getDataRowsToSendCount() { + return dataRowsToSendCount; + } + + public void setDataRowsToSendCount(int dataRowsToSendCount) { + this.dataRowsToSendCount = dataRowsToSendCount; + } + + public int getDataRowsLoadedCount() { + return dataRowsLoadedCount; + } + + public void setDataRowsLoadedCount(int dataRowsLoadedCount) { + this.dataRowsLoadedCount = dataRowsLoadedCount; + } + + public Date getOldestLoadTime() { + return oldestLoadTime; + } + + public void setOldestLoadTime(Date oldestLoadTime) { + this.oldestLoadTime = oldestLoadTime; + } + + public long getPurgeOutgoingLastMs() { + return purgeOutgoingLastMs; + } + + public void setPurgeOutgoingLastMs(long purgeOutgoingLastMs) { + this.purgeOutgoingLastMs = purgeOutgoingLastMs; + } + + public Date getPurgeOutgoingLastRun() { + return purgeOutgoingLastRun; + } + + public void setPurgeOutgoingLastRun(Date purgeOutgoingLastRun) { + this.purgeOutgoingLastRun = purgeOutgoingLastRun; + } + + public long getRoutingAverageMs() { + return routingAverageMs; + } + + public void setRoutingAverageMs(long routingAverageMs) { + this.routingAverageMs = routingAverageMs; + } + + public Date getRoutingLastRun() { + return routingLastRun; + } + + public void setRoutingLastRun(Date routingLastRun) { + this.routingLastRun = routingLastRun; + } + + public long getSymDataSize() { + return symDataSize; + } + + public void setSymDataSize(long symDataSize) { + this.symDataSize = symDataSize; + } + + public long getPurgeOutgoingAverageMs() { + return purgeOutgoingAverageMs; + } + + public void setPurgeOutgoingAverageMs(long purgeOutgoingAverageMs) { + this.purgeOutgoingAverageMs = purgeOutgoingAverageMs; + } + + public long getRoutingLastMs() { + return routingLastMs; + } + + public void setRoutingLastMs(long routingLastMs) { + this.routingLastMs = routingLastMs; + } public boolean requires13Compatiblity() { if (symmetricVersion != null) { @@ -325,4 +427,15 @@ public int compareTo(Node other) { return 0; } } + + public String getMostRecentActiveTableSynced() { + return mostRecentActiveTableSynced; + } + + public void setMostRecentActiveTableSynced(String mostRecentActiveTableSynced) { + this.mostRecentActiveTableSynced = mostRecentActiveTableSynced; + } + + + } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeBatchError.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeBatchError.java index 23fec547ec..e9880f84b9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeBatchError.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeBatchError.java @@ -42,10 +42,11 @@ public class MonitorTypeBatchError implements IMonitorType, ISymmetricEngineAwar protected final Logger log = LoggerFactory.getLogger(getClass()); protected IOutgoingBatchService outgoingBatchService; protected IIncomingBatchService incomingBatchService; - + public static final String NAME = "batchError"; + @Override public String getName() { - return "batchError"; + return NAME; } @Override @@ -97,4 +98,5 @@ protected String serializeDetails(BatchErrorWrapper details) { } return result; } + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeBatchUnsent.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeBatchUnsent.java index a066b66d30..384b4603fd 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeBatchUnsent.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeBatchUnsent.java @@ -29,10 +29,11 @@ public class MonitorTypeBatchUnsent implements IMonitorType, ISymmetricEngineAware, IBuiltInExtensionPoint { protected IOutgoingBatchService outgoingBatchService; - + public static final String NAME = "batchUnsent"; + @Override public String getName() { - return "batchUnsent"; + return NAME; } @Override @@ -51,4 +52,5 @@ public boolean requiresClusterLock() { public void setSymmetricEngine(ISymmetricEngine engine) { outgoingBatchService = engine.getOutgoingBatchService(); } + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeCpu.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeCpu.java index 2c93ba39c7..7e2c942672 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeCpu.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeCpu.java @@ -37,7 +37,8 @@ public class MonitorTypeCpu extends AbstractMonitorType implements IBuiltInExten protected OperatingSystemMXBean osBean; protected RuntimeMXBean runtimeBean; protected List ignoreElements; - + public static final String NAME = "cpu"; + public MonitorTypeCpu() { osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); runtimeBean = ManagementFactory.getRuntimeMXBean(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeDisk.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeDisk.java index 89a998224d..592e2eb7b8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeDisk.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeDisk.java @@ -30,10 +30,11 @@ public class MonitorTypeDisk implements IMonitorType, ISymmetricEngineAware, IBuiltInExtensionPoint { protected File tempDirectory; - + public static final String NAME = "disk"; + @Override public String getName() { - return "disk"; + return NAME; } @Override diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeMemory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeMemory.java index 5c3f0a4bc0..3f2a03f01e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeMemory.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeMemory.java @@ -36,7 +36,8 @@ public class MonitorTypeMemory extends AbstractMonitorType implements IBuiltInExtensionPoint { protected final Logger log = LoggerFactory.getLogger(getClass()); protected MemoryPoolMXBean tenuredPool; - + public static final String NAME = "memory"; + @Override public String getName() { return "memory"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeOfflineNodes.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeOfflineNodes.java index 6dc150971d..9844b0ebe5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeOfflineNodes.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeOfflineNodes.java @@ -39,7 +39,8 @@ public class MonitorTypeOfflineNodes implements IMonitorType, ISymmetricEngineAw protected final Logger log = LoggerFactory.getLogger(getClass()); protected INodeService nodeService; protected IParameterService parameterService; - + public static final String NAME = "offlineNodes"; + @Override public String getName() { return "offlineNodes"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeUnrouted.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeUnrouted.java index 599841772d..c2c2ff5eb8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeUnrouted.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/monitor/MonitorTypeUnrouted.java @@ -29,7 +29,8 @@ public class MonitorTypeUnrouted implements IMonitorType, ISymmetricEngineAware, IBuiltInExtensionPoint { protected IRouterService routerService; - + public static final String NAME = "dataUnrouted"; + @Override public String getName() { return "dataUnrouted"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index 371a1d4198..d5d515b353 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -195,6 +195,8 @@ public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, Trig */ public int countDataInRange(long firstDataId, long secondDataId); + public int countData(); + public long countDataGaps(); public List findDataGapsUnchecked(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java index 499c068cf7..3723b3b552 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IIncomingBatchService.java @@ -40,6 +40,8 @@ public interface IIncomingBatchService { public int countIncomingBatchesInError(String channelId); + public Date getIncomingBatchesLatestUpdateSql(); + public IncomingBatch findIncomingBatch(long batchId, String nodeId); public void refreshIncomingBatch(IncomingBatch batch); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IMonitorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IMonitorService.java index 026e7a2e32..2a1588fc56 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IMonitorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IMonitorService.java @@ -47,6 +47,8 @@ public interface IMonitorService { public List getMonitorEventsFiltered(int limit, String type, int severityLevel, String nodeId, Boolean isResolved); + public List getMonitorEventsByMonitorId(String monitorId); + public void saveMonitorEvent(MonitorEvent notificationEvent); public void deleteMonitorEvent(MonitorEvent event); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index 5ae0a19da0..922a21d65c 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -91,8 +91,12 @@ public OutgoingBatches getOutgoingBatchByLoadRangeAndTable(long loadId, long sta public int countOutgoingBatchesInError(); + public Date getOutgoingBatchesLatestUpdateSql(); + public int countOutgoingBatchesUnsent(); + public int[] countOutgoingNonSystemBatchesRowsUnsent(); + public int countOutgoingBatchesInError(String channelId); public int countOutgoingBatchesUnsent(String channelId); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 165fd98742..28dfd80600 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -1867,6 +1867,11 @@ public int countDataInRange(long firstDataId, long secondDataId) { return sqlTemplate.queryForInt(getSql("countDataInRangeSql"), firstDataId, secondDataId); } + public int countData() { + return sqlTemplate.queryForInt(getSql("countDataSql")); + } + + @Override public void insertCreateEvent(final Node targetNode, TriggerHistory triggerHistory, boolean isLoad, long loadId, String createBy, diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java index 255c58f0e3..22e7091a5e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java @@ -223,6 +223,8 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace + "select max(data_id) from $(data_event) "); putSql("countDataInRangeSql", "" + "select count(*) from $(data) where data_id > ? and data_id < ? "); + putSql("countDataSql", "" + + "select count(*) from $(data)"); putSql("insertIntoDataSql", "insert into $(data) (data_id, table_name, event_type, row_data, pk_data, " + "old_data, trigger_hist_id, channel_id, external_data, node_list, is_prerouted, transaction_id, source_node_id, create_time) " + diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java index 6ed8b6e86b..56e15129e3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java @@ -87,6 +87,11 @@ public int countIncomingBatchesInError() { public int countIncomingBatchesInError(String channelId) { return sqlTemplateDirty.queryForInt(getSql("countIncomingBatchesErrorsOnChannelSql"), channelId); } + + public Date getIncomingBatchesLatestUpdateSql() { + return sqlTemplateDirty.queryForObject(getSql("getIncomingBatchesLatestUpdateSql"), Date.class); + } + public List findIncomingBatchErrors(int maxRows) { return sqlTemplateDirty.query(getSql("selectIncomingBatchPrefixSql", "findIncomingBatchErrorsSql"), maxRows, diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceSqlMap.java index 2bda900a35..bee807e534 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchServiceSqlMap.java @@ -65,6 +65,9 @@ public IncomingBatchServiceSqlMap(IDatabasePlatform platform, Map getMonitorEventsFiltered(int limit, String type, int s return sqlTemplate.query(sql, limit, new MonitorEventRowMapper(), args.toArray()); } + @Override + public List getMonitorEventsByMonitorId(String monitorId) { + String sql = getSql("selectMonitorEventSql", "whereMonitorEventIdSql"); + ArrayList args = new ArrayList(); + + sql += " order by event_time desc"; + return sqlTemplate.query(sql, new MonitorEventRowMapper(), monitorId); + } + protected List getMonitorEventsForNotification(int severityLevel) { return sqlTemplate.query(getSql("selectMonitorEventSql", "whereMonitorEventForNotificationBySeveritySql"), new MonitorEventRowMapper(), severityLevel); @@ -481,7 +490,7 @@ public Monitor mapRow(Row row) { } } - static class MonitorEventRowMapper implements ISqlRowMapper { + public static class MonitorEventRowMapper implements ISqlRowMapper { public MonitorEvent mapRow(Row row) { MonitorEvent m = new MonitorEvent(); m.setMonitorId(row.getString("monitor_id")); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/MonitorServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/MonitorServiceSqlMap.java index cf0ca1dd14..c59cdaa1b5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/MonitorServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/MonitorServiceSqlMap.java @@ -67,6 +67,8 @@ public MonitorServiceSqlMap(IDatabasePlatform platform, Map repl putSql("whereMonitorEventFilteredSql", "where severity_level >= ?"); putSql("whereMonitorEventForNotificationBySeveritySql", "where is_notified = 0 and severity_level >= ?"); + putSql("whereMonitorEventIdSql", + "where monitor_id = ? and is_resolved = 0"); putSql("insertMonitorEventSql", "insert into $(monitor_event) " + "(monitor_id, node_id, event_time, host_name, " + type + ", event_value, event_count, threshold, severity_level, " + diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java index 1ebae12521..0244c01c84 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java @@ -281,13 +281,21 @@ public void save(Node node) { node.getSymmetricVersion(), node.getSyncUrl(), node.isSyncEnabled() ? 1 : 0, node.getBatchToSendCount(), node.getBatchInErrorCount(), + node.getLastSuccessfulSyncDate(), node.getMostRecentActiveTableSynced(), + node.getPurgeOutgoingAverageMs(), node.getPurgeOutgoingLastMs(), node.getPurgeOutgoingLastRun(), node.getRoutingAverageMs(), + node.getRoutingLastMs(), node.getRoutingLastRun(), node.getSymDataSize(), node.getCreatedAtNodeId(), node.getDeploymentType(), node.getDeploymentSubType(), node.getConfigVersion(), + node.getDataRowsToSendCount(), node.getDataRowsLoadedCount(), node.getOldestLoadTime(), node.getNodeId() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }); + Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.TIMESTAMP, + Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, + Types.BIGINT, Types.BIGINT, + Types.TIMESTAMP, Types.BIGINT, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.INTEGER, + Types.TIMESTAMP, Types.VARCHAR }); flushNodeGroupCache(); } flushNodeCache(); @@ -300,13 +308,22 @@ public boolean updateNode(Node node) { node.getDatabaseVersion(), node.getDatabaseName(), node.getSchemaVersion(), node.getSymmetricVersion(), node.getSyncUrl(), node.isSyncEnabled() ? 1 : 0, - node.getBatchToSendCount(), node.getBatchInErrorCount(), + node.getBatchToSendCount(), node.getBatchInErrorCount(), node.getLastSuccessfulSyncDate(), + node.getMostRecentActiveTableSynced(), node.getPurgeOutgoingAverageMs(), + node.getPurgeOutgoingLastMs(), node.getPurgeOutgoingLastRun(), node.getRoutingAverageMs(), node.getRoutingLastMs(), + node.getRoutingLastRun(), node.getSymDataSize(), node.getCreatedAtNodeId(), node.getDeploymentType(), node.getDeploymentSubType(), - node.getConfigVersion(), node.getNodeId() }, + node.getConfigVersion(), + node.getDataRowsToSendCount(), node.getDataRowsLoadedCount(), node.getOldestLoadTime(), + node.getNodeId() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }) == 1; + Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.TIMESTAMP, + Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, + Types.BIGINT, Types.BIGINT, + Types.TIMESTAMP, Types.BIGINT, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.INTEGER, + Types.TIMESTAMP, Types.VARCHAR }) == 1; return updated; } @@ -1006,7 +1023,7 @@ protected void fireOffline(List offlineClientNodeList) { } } - static class NodeRowMapper implements ISqlRowMapper { + public static class NodeRowMapper implements ISqlRowMapper { public Node mapRow(Row rs) { Node node = new Node(); node.setNodeId(rs.getString("node_id")); @@ -1025,6 +1042,14 @@ public Node mapRow(Row rs) { node.setDeploymentType(rs.getString("deployment_type")); node.setDeploymentSubType(rs.getString("deployment_sub_type")); node.setConfigVersion(rs.getString("config_version")); + node.setPurgeOutgoingAverageMs(rs.getLong("purge_outgoing_average_ms")); + node.setPurgeOutgoingLastMs(rs.getLong("purge_outgoing_last_run_ms")); + node.setPurgeOutgoingLastRun(rs.getDateTime("purge_outgoing_last_finish")); + node.setRoutingAverageMs(rs.getLong("routing_average_run_ms")); + node.setRoutingLastMs(rs.getLong("routing_last_run_ms")); + node.setRoutingLastRun(rs.getDateTime("routing_last_finish")); + node.setSymDataSize(rs.getLong("sym_data_size")); + return node; } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java index d68e5b046e..a9b3bf0e94 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java @@ -44,13 +44,17 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map replace putSql("insertNodeSql", "insert into $(node) (node_group_id, external_id, database_type, database_version, database_name, " + "schema_version, symmetric_version, sync_url," + - "sync_enabled, batch_to_send_count, batch_in_error_count, created_at_node_id, " + - "deployment_type, deployment_sub_type, config_version, node_id) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); + "sync_enabled, batch_to_send_count, batch_in_error_count, batch_last_successful, most_recent_active_table, " + + "purge_outgoing_average_ms, purge_outgoing_last_run_ms, purge_outgoing_last_finish, routing_average_run_ms, routing_last_run_ms, routing_last_finish, sym_data_size, " + + "created_at_node_id, deployment_type, deployment_sub_type, config_version, data_rows_to_send_count, " + + " data_rows_loaded_count,oldest_load_time, node_id) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"); putSql("updateNodeSql", "update $(node) set node_group_id=?, external_id=?, database_type=?, " + " database_version=?, database_name=?, schema_version=?, symmetric_version=?, sync_url=?, " - + " sync_enabled=?, batch_to_send_count=?, batch_in_error_count=?, " - + " created_at_node_id=?, deployment_type=?, deployment_sub_type=?, config_version = ? where node_id = ?"); + + " sync_enabled=?, batch_to_send_count=?, batch_in_error_count=?, batch_last_successful=?, most_recent_active_table=?, " + + " purge_outgoing_average_ms = ? , purge_outgoing_last_run_ms = ?, purge_outgoing_last_finish = ?, routing_average_run_ms = ?, routing_last_run_ms = ?, routing_last_finish = ?, sym_data_size = ?, " + + " created_at_node_id=?, deployment_type=?, deployment_sub_type=?, config_version = ?, " + + " data_rows_to_send_count = ?, data_rows_loaded_count = ?, oldest_load_time = ? where node_id = ?"); putSql("findNodeSql", "where node_id = ? "); putSql("findNodeByExternalIdSql", "" + "where node_group_id = ? and external_id = ? order by node_id "); @@ -114,7 +118,9 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map replace "select c.node_id, c.node_group_id, c.external_id, c.sync_enabled, c.sync_url, " + " c.schema_version, c.database_type, c.database_version, c.database_name, " + " c.symmetric_version, c.created_at_node_id, c.batch_to_send_count, c.batch_in_error_count, " - + " c.deployment_type, c.deployment_sub_type, c.config_version from " + + " c.deployment_type, c.deployment_sub_type, c.config_version, " + + " c.purge_outgoing_last_run_ms, c.purge_outgoing_average_ms, c.purge_outgoing_last_finish, c.routing_average_run_ms, c.routing_last_run_ms, c.routing_last_finish, c.sym_data_size " + + " from " + " $(node) c "); putSql("updateNodeSecuritySql", "update $(node_security) set node_password = ?, registration_enabled = ?, " + "registration_time = ?, registration_not_before = ?, registration_not_after = ?, " diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index c415f80a9d..61f9075426 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -36,6 +36,7 @@ import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.Row; import org.jumpmind.db.sql.mapper.LongMapper; +import org.jumpmind.db.sql.mapper.RowMapper; import org.jumpmind.db.sql.mapper.StringMapper; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; @@ -376,11 +377,24 @@ public int countOutgoingBatchesInError(String channelId) { return sqlTemplateDirty.queryForInt(getSql("countOutgoingBatchesErrorsOnChannelSql"), channelId); } - @Override + @Override + public Date getOutgoingBatchesLatestUpdateSql() { + return sqlTemplateDirty.queryForObject(getSql("getOutgoingBatchesLatestUpdateSql"), Date.class); + } + public int countOutgoingBatchesUnsent() { return sqlTemplateDirty.queryForInt(getSql("countOutgoingBatchesUnsentSql")); } + public int[] countOutgoingNonSystemBatchesRowsUnsent() { + int[] batchesRows = new int[2]; + for (Row row : sqlTemplateDirty.query(getSql("countOutgoingNonSystemBatchesUnsentSql"))) { + batchesRows[0] = row.getInt("batch_count"); + batchesRows[1] = row.getInt("row_count"); + } + return batchesRows; + } + @Override public int countOutgoingBatchesUnsent(String channelId) { return sqlTemplateDirty.queryForInt(getSql("countOutgoingBatchesUnsentOnChannelSql"), channelId); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 026526a444..9cdd095dd0 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -112,6 +112,10 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, "select count(*) from $(outgoing_batch) where error_flag=1"); putSql("countOutgoingBatchesUnsentSql", "select count(*) from $(outgoing_batch) where status != 'OK'"); + putSql("countOutgoingNonSystemBatchesUnsentSql", + "select count(batch_id) as batch_count, sum(data_row_count) as row_count from $(outgoing_batch) where status != 'OK' and channel_id not in ('heartbeat', 'monitor', 'config')"); + putSql("getOutgoingBatchesLatestUpdateSql", + "select max(last_update_time) from $(outgoing_batch) where status = 'OK' and channel_id not in ('heartbeat', 'monitor', 'config')"); putSql("countOutgoingBatchesWithStatusSql", "select count(*) from $(outgoing_batch) where status = ? "); putSql("countOutgoingBatchesUnsentOnChannelSql", diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/IStatisticManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/IStatisticManager.java index fed6732513..46c88a8310 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/IStatisticManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/IStatisticManager.java @@ -126,4 +126,8 @@ public void addRouterStats(long startDataId, long endDataId, long dataReadCount, public HostStats getWorkingHostStats(); public TreeMap> getNodeStatsForPeriod(Date start, Date end, String nodeId, int periodSizeInMinutes); + + public String getMostRecentActiveTableSynced(); + + public Map getTotalLoadedRows(); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java index 3c05b56531..6d3bef47ca 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java @@ -643,4 +643,16 @@ protected HostStats getHostStats() { @Override public void incrementTableRows(Map> tableCounts, boolean loaded) { } + + @Override + public String getMostRecentActiveTableSynced() { + return ""; + } + + @Override + public Map getTotalLoadedRows() { + // TODO Auto-generated method stub + return null; + } + } \ No newline at end of file diff --git a/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric-core/src/main/resources/symmetric-schema.xml index b49f24e2dc..72276c7f72 100644 --- a/symmetric-core/src/main/resources/symmetric-schema.xml +++ b/symmetric-core/src/main/resources/symmetric-schema.xml @@ -434,6 +434,18 @@ + + + + + + + + + + + + diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/statistic/MockStatisticManager.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/statistic/MockStatisticManager.java index 6f159a03d7..4328e6fe63 100644 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/statistic/MockStatisticManager.java +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/statistic/MockStatisticManager.java @@ -193,4 +193,16 @@ public void incrementDataLoadedOutgoing(String channelId, long count) { public void incrementTableRows(Map> tableCounts, boolean loaded) { // TODO Auto-generated method stub } + + @Override + public String getMostRecentActiveTableSynced() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Map getTotalLoadedRows() { + // TODO Auto-generated method stub + return null; + } } \ No newline at end of file