Skip to content

Commit

Permalink
0005886: Monitor enhancements for centralized support
Browse files Browse the repository at this point in the history
  • Loading branch information
joshahicks committed Jun 19, 2023
1 parent c62b0eb commit 980217a
Show file tree
Hide file tree
Showing 30 changed files with 396 additions and 27 deletions.
Expand Up @@ -20,6 +20,7 @@
*/
package org.jumpmind.symmetric.android;

import java.util.ArrayList;
import java.util.List;

import org.jumpmind.symmetric.ISymmetricEngine;
Expand Down Expand Up @@ -137,4 +138,10 @@ public List<Monitor> getActiveMonitorsUnresolvedForNode(String nodeGroupId, Stri
public List<Monitor> getActiveMonitorsUnresolvedForNodeFromDb(String nodeGroupId, String externalId) {
return null;
}

@Override
public List<MonitorEvent> getMonitorEventsByMonitorId(String monitorId) {
return new ArrayList<MonitorEvent>();
}

}
Expand Up @@ -23,6 +23,7 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand All @@ -39,8 +40,11 @@
import org.jumpmind.symmetric.ext.IHeartbeatListener;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IStatisticService;
import org.jumpmind.symmetric.statistic.IStatisticManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -59,9 +63,59 @@ public void heartbeat(Node me) {
boolean updateWithBatchStatus = parameterService.is(ParameterConstants.HEARTBEAT_UPDATE_NODE_WITH_BATCH_STATUS, false);
int outgoingErrorCount = -1;
int outgoingUnsentCount = -1;
int outgoingUnsentRowCount = -1;
Date lastSuccessfulSyncTime = null;
String mostRecentActiveTableSynced = null;
int totalRowsLoaded = -1;
Date oldestLoadedDate = null;
long purgeOutgoingLastMs = -1;
Date purgeOutgoingLastRun = null;
long purgeOutgoingAverage = -1;

long routingLastMs = -1;
long routingAveragetMs = -1;
Date routingLastRun = null;
long symDataSize = -1;

if (updateWithBatchStatus) {
outgoingUnsentCount = engine.getOutgoingBatchService().countOutgoingBatchesUnsent();
outgoingErrorCount = engine.getOutgoingBatchService().countOutgoingBatchesInError();
int[] batchesRowsUnsent = engine.getOutgoingBatchService().countOutgoingNonSystemBatchesRowsUnsent();

outgoingUnsentCount = batchesRowsUnsent[0];
outgoingUnsentRowCount = batchesRowsUnsent[1];

Date outDate = engine.getOutgoingBatchService().getOutgoingBatchesLatestUpdateSql();
Date inDate = engine.getIncomingBatchService().getIncomingBatchesLatestUpdateSql();
if (outDate == null && inDate == null) {
lastSuccessfulSyncTime = null;
} else if (outDate == null) {
lastSuccessfulSyncTime = inDate;
} else if (inDate == null) {
lastSuccessfulSyncTime = outDate;
} else {
lastSuccessfulSyncTime = outDate.after(inDate) ? outDate : inDate;
}

IStatisticManager statisticsManager = engine.getStatisticManager();
mostRecentActiveTableSynced = statisticsManager.getMostRecentActiveTableSynced();
Map<Integer, Date> totalLoadedRowsMap = statisticsManager.getTotalLoadedRows();
if (totalLoadedRowsMap != null && totalLoadedRowsMap.size() == 1) {
totalRowsLoaded = totalLoadedRowsMap.keySet().iterator().next();
oldestLoadedDate = totalLoadedRowsMap.values().iterator().next();
}

IJob purgeOutgoingJob = engine.getJobManager().getJob(ClusterConstants.PURGE_OUTGOING);
purgeOutgoingLastMs = purgeOutgoingJob.getLastExecutionTimeInMs();
purgeOutgoingLastRun = purgeOutgoingJob.getLastFinishTime();
purgeOutgoingAverage = purgeOutgoingJob.getAverageExecutionTimeInMs();

IJob routeJob = engine.getJobManager().getJob(ClusterConstants.ROUTE);
routingAveragetMs = routeJob.getAverageExecutionTimeInMs();
routingLastRun = routeJob.getLastFinishTime();
routingLastMs = routeJob.getLastExecutionTimeInMs();

symDataSize = engine.getDataService().countData();

}
if (!parameterService.getExternalId().equals(me.getExternalId())
|| !parameterService.getNodeGroupId().equals(me.getNodeGroupId())
Expand All @@ -73,7 +127,16 @@ public void heartbeat(Node me) {
|| !symmetricDialect.getName().equals(me.getDatabaseType())
|| !symmetricDialect.getVersion().equals(me.getDatabaseVersion())
|| me.getBatchInErrorCount() != outgoingErrorCount
|| me.getBatchToSendCount() != outgoingUnsentCount) {
|| me.getBatchToSendCount() != outgoingUnsentCount
|| me.getLastSuccessfulSyncDate() != lastSuccessfulSyncTime
|| me.getMostRecentActiveTableSynced() != mostRecentActiveTableSynced
|| me.getPurgeOutgoingLastMs() != purgeOutgoingLastMs
|| me.getPurgeOutgoingLastRun() != purgeOutgoingLastRun
|| me.getPurgeOutgoingAverageMs() != purgeOutgoingAverage
|| me.getRoutingAverageMs() != routingAveragetMs
|| me.getRoutingLastRun() != routingLastRun
|| me.getRoutingLastMs() != routingLastMs
|| me.getSymDataSize() != symDataSize) {
log.info("Some attribute(s) of node changed. Recording changes");
me.setDeploymentType(engine.getDeploymentType());
me.setDeploymentSubType(engine.getDeploymentSubType());
Expand All @@ -83,6 +146,19 @@ public void heartbeat(Node me) {
me.setDatabaseName(engine.getDatabasePlatform().getName());
me.setBatchInErrorCount(outgoingErrorCount);
me.setBatchToSendCount(outgoingUnsentCount);
me.setLastSuccessfulSyncDate(lastSuccessfulSyncTime);
me.setDataRowsToSendCount(outgoingUnsentRowCount);
me.setMostRecentActiveTableSynced(mostRecentActiveTableSynced);
me.setDataRowsLoadedCount(totalRowsLoaded);
me.setOldestLoadTime(oldestLoadedDate);
me.setPurgeOutgoingLastMs(purgeOutgoingLastMs);
me.setPurgeOutgoingLastRun(purgeOutgoingLastRun);
me.setPurgeOutgoingAverageMs(purgeOutgoingAverage);
me.setRoutingAverageMs(routingAveragetMs);
me.setRoutingLastRun(routingLastRun);
me.setRoutingLastMs(routingLastMs);
me.setSymDataSize(symDataSize);

me.setSchemaVersion(parameterService.getString(ParameterConstants.SCHEMA_VERSION));
if (engine.getParameterService().isRegistrationServer()) {
me.setConfigVersion(Version.version());
Expand Down
Expand Up @@ -104,6 +104,15 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) {
}
}
}
} else if (matchesTable(table, TableConstants.SYM_MONITOR) && (data.getDataEventType() == DataEventType.INSERT)) {
Map<String, String> newData = data.toColumnNameValuePairs(table.getColumnNames(), CsvData.ROW_DATA);
String monitorID = newData.get("MONITOR_ID");
if (monitorID != null && (monitorID.equals("SystemBatchErrorMonitor")
|| monitorID.equals("SystemLogMonitor")
|| monitorID.equals("SystemOfflineNodeMonitor"))) {
return false;
}

}
return true;
}
Expand Down
Expand Up @@ -22,6 +22,14 @@

import java.util.Date;

import org.jumpmind.symmetric.monitor.MonitorTypeBatchError;
import org.jumpmind.symmetric.monitor.MonitorTypeBatchUnsent;
import org.jumpmind.symmetric.monitor.MonitorTypeCpu;
import org.jumpmind.symmetric.monitor.MonitorTypeDisk;
import org.jumpmind.symmetric.monitor.MonitorTypeMemory;
import org.jumpmind.symmetric.monitor.MonitorTypeOfflineNodes;
import org.jumpmind.symmetric.monitor.MonitorTypeUnrouted;

public class MonitorEvent {
protected String monitorId;
protected String nodeId;
Expand Down Expand Up @@ -177,4 +185,26 @@ public String getDetails() {
public void setDetails(String details) {
this.details = details;
}

public String getPrettyPrint() {
StringBuffer sb = new StringBuffer(String.valueOf(value));
if (MonitorTypeBatchError.NAME.equals(getType())) {
sb.append(getValue() > 1 ? " batches" : " batch");
} else if (MonitorTypeBatchUnsent.NAME.equals(getType())) {
sb.append(getValue() > 1 ? " batches" : " batch");
} else if (MonitorTypeUnrouted.NAME.equals(getType())) {
sb.append(getValue() > 1 ? " rows" : " row");
} else if (MonitorTypeOfflineNodes.NAME.equals(getType())) {
sb.append(getValue() > 1 ? " nodes" : " node");
} else if (MonitorTypeCpu.NAME.equals(getType())) {
sb.append("%");
} else if (MonitorTypeMemory.NAME.equals(getType())) {
sb.append("%");
} else if (MonitorTypeDisk.NAME.equals(getType())) {
sb.append("%");
}

return sb.toString();

}
}
115 changes: 114 additions & 1 deletion symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java
Expand Up @@ -24,6 +24,7 @@

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;
import java.util.Properties;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -68,7 +69,19 @@ public class Node implements Serializable, Comparable<Node> {
private String deploymentType;
private String deploymentSubType;
private int[] symmetricVersionParts;

private Date lastSuccessfulSyncDate;
private String mostRecentActiveTableSynced;
private int dataRowsToSendCount;
private int dataRowsLoadedCount;
private Date oldestLoadTime;
private long purgeOutgoingLastMs;
private Date purgeOutgoingLastRun;
private long purgeOutgoingAverageMs;
private long routingAverageMs;
private Date routingLastRun;
private long routingLastMs;
private long symDataSize;

public Node() {
}

Expand Down Expand Up @@ -265,6 +278,95 @@ public String getDeploymentSubType() {
public void setDeploymentSubType(String deploymentSubType) {
this.deploymentSubType = deploymentSubType;
}

public Date getLastSuccessfulSyncDate() {
return lastSuccessfulSyncDate;
}

public void setLastSuccessfulSyncDate(Date lastSuccessfulSyncDate) {
this.lastSuccessfulSyncDate = lastSuccessfulSyncDate;
}


public int getDataRowsToSendCount() {
return dataRowsToSendCount;
}

public void setDataRowsToSendCount(int dataRowsToSendCount) {
this.dataRowsToSendCount = dataRowsToSendCount;
}

public int getDataRowsLoadedCount() {
return dataRowsLoadedCount;
}

public void setDataRowsLoadedCount(int dataRowsLoadedCount) {
this.dataRowsLoadedCount = dataRowsLoadedCount;
}

public Date getOldestLoadTime() {
return oldestLoadTime;
}

public void setOldestLoadTime(Date oldestLoadTime) {
this.oldestLoadTime = oldestLoadTime;
}

public long getPurgeOutgoingLastMs() {
return purgeOutgoingLastMs;
}

public void setPurgeOutgoingLastMs(long purgeOutgoingLastMs) {
this.purgeOutgoingLastMs = purgeOutgoingLastMs;
}

public Date getPurgeOutgoingLastRun() {
return purgeOutgoingLastRun;
}

public void setPurgeOutgoingLastRun(Date purgeOutgoingLastRun) {
this.purgeOutgoingLastRun = purgeOutgoingLastRun;
}

public long getRoutingAverageMs() {
return routingAverageMs;
}

public void setRoutingAverageMs(long routingAverageMs) {
this.routingAverageMs = routingAverageMs;
}

public Date getRoutingLastRun() {
return routingLastRun;
}

public void setRoutingLastRun(Date routingLastRun) {
this.routingLastRun = routingLastRun;
}

public long getSymDataSize() {
return symDataSize;
}

public void setSymDataSize(long symDataSize) {
this.symDataSize = symDataSize;
}

public long getPurgeOutgoingAverageMs() {
return purgeOutgoingAverageMs;
}

public void setPurgeOutgoingAverageMs(long purgeOutgoingAverageMs) {
this.purgeOutgoingAverageMs = purgeOutgoingAverageMs;
}

public long getRoutingLastMs() {
return routingLastMs;
}

public void setRoutingLastMs(long routingLastMs) {
this.routingLastMs = routingLastMs;
}

public boolean requires13Compatiblity() {
if (symmetricVersion != null) {
Expand Down Expand Up @@ -325,4 +427,15 @@ public int compareTo(Node other) {
return 0;
}
}

public String getMostRecentActiveTableSynced() {
return mostRecentActiveTableSynced;
}

public void setMostRecentActiveTableSynced(String mostRecentActiveTableSynced) {
this.mostRecentActiveTableSynced = mostRecentActiveTableSynced;
}



}
Expand Up @@ -42,10 +42,11 @@ public class MonitorTypeBatchError implements IMonitorType, ISymmetricEngineAwar
protected final Logger log = LoggerFactory.getLogger(getClass());
protected IOutgoingBatchService outgoingBatchService;
protected IIncomingBatchService incomingBatchService;

public static final String NAME = "batchError";

@Override
public String getName() {
return "batchError";
return NAME;
}

@Override
Expand Down Expand Up @@ -97,4 +98,5 @@ protected String serializeDetails(BatchErrorWrapper details) {
}
return result;
}

}
Expand Up @@ -29,10 +29,11 @@

public class MonitorTypeBatchUnsent implements IMonitorType, ISymmetricEngineAware, IBuiltInExtensionPoint {
protected IOutgoingBatchService outgoingBatchService;

public static final String NAME = "batchUnsent";

@Override
public String getName() {
return "batchUnsent";
return NAME;
}

@Override
Expand All @@ -51,4 +52,5 @@ public boolean requiresClusterLock() {
public void setSymmetricEngine(ISymmetricEngine engine) {
outgoingBatchService = engine.getOutgoingBatchService();
}

}
Expand Up @@ -37,7 +37,8 @@ public class MonitorTypeCpu extends AbstractMonitorType implements IBuiltInExten
protected OperatingSystemMXBean osBean;
protected RuntimeMXBean runtimeBean;
protected List<StackTraceElement> ignoreElements;

public static final String NAME = "cpu";

public MonitorTypeCpu() {
osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
runtimeBean = ManagementFactory.getRuntimeMXBean();
Expand Down

0 comments on commit 980217a

Please sign in to comment.