Skip to content

Commit

Permalink
Updates for process info to keep track of status changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Aug 22, 2016
1 parent 53f616a commit 082feb9
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 47 deletions.
Expand Up @@ -25,6 +25,8 @@
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;

Expand All @@ -33,7 +35,7 @@ public class ProcessInfo implements Serializable, Comparable<ProcessInfo>, Clone
private static final long serialVersionUID = 1L;

public static enum Status {
NEW, QUERYING, EXTRACTING, LOADING, TRANSFERRING, ACKING, PROCESSING, OK, ERROR;
NEW, QUERYING, EXTRACTING, LOADING, TRANSFERRING, ACKING, PROCESSING, OK, ERROR, CREATING;

public String toString() {
switch (this) {
Expand All @@ -55,6 +57,8 @@ public String toString() {
return "Ok";
case ERROR:
return "Error";
case CREATING:
return "Creating";

default:
return name();
Expand Down Expand Up @@ -92,6 +96,8 @@ public String toString() {

private Date lastStatusChangeTime = new Date();

private Map<Status, ProcessInfo> statusHistory;

private Date endTime;

public ProcessInfo() {
Expand Down Expand Up @@ -128,7 +134,14 @@ public Status getStatus() {
}

public void setStatus(Status status) {
this.status = status;
if (statusHistory == null) {
statusHistory = new HashMap<Status, ProcessInfo>();
}
statusHistory.put(this.status, this.copy());
statusHistory.put(status, this);

this.status = status;

this.lastStatusChangeTime = new Date();
if (status == Status.OK || status == Status.ERROR) {
this.endTime = new Date();
Expand Down Expand Up @@ -260,6 +273,14 @@ public void setCurrentBatchStartTime(Date currentBatchStartTime) {
this.currentBatchStartTime = currentBatchStartTime;
}

public Map<Status, ProcessInfo> getStatusHistory() {
return this.statusHistory;
}

public ProcessInfo getStatusHistory(Status status) {
return this.statusHistory == null ? null : this.statusHistory.get(status);
}

@Override
public String toString() {
return String.format("%s,status=%s,startTime=%s", key.toString(), status.toString(),
Expand Down
Expand Up @@ -31,7 +31,8 @@
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchSummary;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.model.OutgoingLoadSummary;
import org.jumpmind.symmetric.model.OutgoingLoadSummary;
import org.jumpmind.symmetric.service.impl.OutgoingBatchService.LoadStatusSummary;

/**
* This service provides an API to access to the outgoing batch table.
Expand Down Expand Up @@ -104,9 +105,11 @@ public List<OutgoingBatch> listOutgoingBatches(List<String> nodeIds, List<String

public Set<Long> getActiveLoads(String sourceNodeId);

public List<String> getQueuedLoads(String sourceNodeId);

public LoadSummary getLoadSummary(long loadId);

public Map<String, Map<String, Integer>> getLoadStatusSummarySql(long loadId);
public Map<String, Map<String, LoadStatusSummary>> getLoadStatusSummarySql(long loadId);

public void copyOutgoingBatches(String channelId, long startBatchId, String fromNodeId, String toNodeId);

Expand Down
Expand Up @@ -400,6 +400,8 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
.getActiveTriggerHistories(new Trigger(reloadRequest.getTriggerId(), null)));
}
}
processInfo.setDataCount(triggerHistories.size());

Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId = triggerRouterService
.fillTriggerRoutersByHistIdAndSortHist(sourceNode.getNodeGroupId(),
targetNode.getNodeGroupId(), triggerHistories);
Expand All @@ -411,11 +413,19 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
createBy, transactional, transaction);
}
Map<String, TableReloadRequest> mapReloadRequests = convertReloadListToMap(reloadRequests);

String symNodeSecurityReloadChannel = null;
try {
symNodeSecurityReloadChannel = triggerRoutersByHistoryId.get(triggerHistories.get(0)
.getTriggerHistoryId()).get(0).getTrigger().getReloadChannelId();
}
catch (Exception e) { }

if (isFullLoad || (reloadRequests != null && reloadRequests.size() > 0)) {
insertSqlEventsPriorToReload(targetNode, nodeIdRecord, loadId, createBy,
transactional, transaction, reverse,
triggerHistories, triggerRoutersByHistoryId,
mapReloadRequests, isFullLoad);
mapReloadRequests, isFullLoad, symNodeSecurityReloadChannel);
}

insertCreateBatchesForReload(targetNode, loadId, createBy,
Expand All @@ -433,6 +443,7 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
insertLoadBatchesForReload(targetNode, loadId, createBy, triggerHistories,
triggerRoutersByHistoryId, transactional, transaction, mapReloadRequests, processInfo);


if (isFullLoad) {
String afterSql = parameterService
.getString(reverse ? ParameterConstants.INITIAL_LOAD_REVERSE_AFTER_SQL
Expand All @@ -457,8 +468,8 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
}

if (!Constants.DEPLOYMENT_TYPE_REST.equals(targetNode.getDeploymentType())) {
insertNodeSecurityUpdate(transaction, nodeIdRecord,
targetNode.getNodeId(), true, loadId, createBy);
insertNodeSecurityUpdate(transaction, nodeIdRecord,
targetNode.getNodeId(), true, loadId, createBy, symNodeSecurityReloadChannel);
}

engine.getStatisticManager().incrementNodesLoaded(1);
Expand Down Expand Up @@ -577,15 +588,15 @@ private void insertSqlEventsPriorToReload(Node targetNode, String nodeIdRecord,
String createBy, boolean transactional, ISqlTransaction transaction, boolean reverse,
List<TriggerHistory> triggerHistories,
Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId,
Map<String, TableReloadRequest> reloadRequests, boolean isFullLoad) {
Map<String, TableReloadRequest> reloadRequests, boolean isFullLoad, String channelId) {

if (!Constants.DEPLOYMENT_TYPE_REST.equals(targetNode.getDeploymentType())) {
/*
* Insert node security so the client doing the initial load knows
* that an initial load is currently happening
*/
insertNodeSecurityUpdate(transaction, nodeIdRecord, targetNode.getNodeId(), true,
loadId, createBy);
loadId, createBy, channelId);

if (isFullLoad) {
/*
Expand Down Expand Up @@ -823,12 +834,13 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre
String catalogSeparator = dbInfo.getCatalogSeparator();
String schemaSeparator = dbInfo.getSchemaSeparator();

processInfo.setBatchCount(triggerHistories.size());

for (TriggerHistory triggerHistory : triggerHistories) {
List<TriggerRouter> triggerRouters = triggerRoutersByHistoryId.get(triggerHistory
.getTriggerHistoryId());

processInfo.incrementCurrentDataCount();

for (TriggerRouter triggerRouter : triggerRouters) {
if (triggerRouter.getInitialLoadOrder() >= 0
&& engine.getGroupletService().isTargetEnabled(triggerRouter, targetNode)) {
Expand Down Expand Up @@ -879,10 +891,6 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre
numberOfBatches = 1;
}

processInfo.setCurrentTableName(table.getFullyQualifiedTableName());
processInfo.setBatchCount(numberOfBatches);
processInfo.setDataCount(rowCount);

long startBatchId = -1;
long endBatchId = -1;
for (int i = 0; i < numberOfBatches; i++) {
Expand All @@ -891,7 +899,6 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre
triggerHistory, selectSql, true, loadId, createBy, Status.RQ);
if (startBatchId == -1) {
startBatchId = endBatchId;
processInfo.setCurrentBatchId(startBatchId);
}
}

Expand All @@ -906,7 +913,6 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre
transaction.commit();
}
}
processInfo.incrementCurrentBatchCount();

}
}
Expand Down Expand Up @@ -1224,7 +1230,7 @@ public void insertDataAndDataEventAndOutgoingBatch(Data data, String channelId,
long dataId = insertData(transaction, data);
for (Node node : nodes) {
insertDataEventAndOutgoingBatch(transaction, dataId, channelId, node.getNodeId(),
data.getDataEventType(), routerId, isLoad, loadId, createBy, Status.NE);
data.getDataEventType(), routerId, isLoad, loadId, createBy, Status.NE, data.getTableName());
}
transaction.commit();
} catch (Error ex) {
Expand Down Expand Up @@ -1275,22 +1281,33 @@ public long insertDataAndDataEventAndOutgoingBatch(Data data, String nodeId, Str
*/
public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data,
String nodeId, String routerId, boolean isLoad, long loadId, String createBy,
Status status) {
Status status, String overrideChannelId) {
long dataId = insertData(transaction, data);
String channelId = null;
if (isLoad) {
TriggerHistory history = data.getTriggerHistory();
if (history != null && channelId == null) {
Trigger trigger = engine.getTriggerRouterService().getTriggerById(
history.getTriggerId());
channelId = getReloadChannelIdForTrigger(trigger, engine.getConfigurationService()
.getChannels(false));
}
if (overrideChannelId != null) {
channelId = overrideChannelId;
}
else {
TriggerHistory history = data.getTriggerHistory();
if (history != null && channelId == null) {
Trigger trigger = engine.getTriggerRouterService().getTriggerById(
history.getTriggerId());
channelId = getReloadChannelIdForTrigger(trigger, engine.getConfigurationService()
.getChannels(false));
}
}
} else {
channelId = data.getChannelId();
}
return insertDataEventAndOutgoingBatch(transaction, dataId, channelId, nodeId,
data.getDataEventType(), routerId, isLoad, loadId, createBy, status);
data.getDataEventType(), routerId, isLoad, loadId, createBy, status, data.getTableName());
}

public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data,
String nodeId, String routerId, boolean isLoad, long loadId, String createBy,
Status status) {
return insertDataAndDataEventAndOutgoingBatch(transaction, data, nodeId, routerId, isLoad, loadId, createBy, status, null);
}

/**
Expand All @@ -1300,12 +1317,13 @@ public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction,
*/
protected long insertDataEventAndOutgoingBatch(ISqlTransaction transaction, long dataId,
String channelId, String nodeId, DataEventType eventType, String routerId,
boolean isLoad, long loadId, String createBy, Status status) {
boolean isLoad, long loadId, String createBy, Status status, String tableName) {
OutgoingBatch outgoingBatch = new OutgoingBatch(nodeId, channelId, status);
outgoingBatch.setLoadId(loadId);
outgoingBatch.setCreateBy(createBy);
outgoingBatch.setLoadFlag(isLoad);
outgoingBatch.incrementEventCount(eventType);
outgoingBatch.incrementTableCount(tableName);
if (status == Status.RQ) {
outgoingBatch.setExtractJobFlag(true);
}
Expand All @@ -1331,11 +1349,16 @@ public String reloadNode(String nodeId, boolean reverseLoad, String createBy) {

private void insertNodeSecurityUpdate(ISqlTransaction transaction, String nodeIdRecord,
String targetNodeId, boolean isLoad, long loadId, String createBy) {
insertNodeSecurityUpdate(transaction, nodeIdRecord, targetNodeId, isLoad, loadId, createBy, null);
}

private void insertNodeSecurityUpdate(ISqlTransaction transaction, String nodeIdRecord,
String targetNodeId, boolean isLoad, long loadId, String createBy, String channelId) {
Data data = createData(transaction, null, null, tablePrefix + "_node_security",
" t.node_id = '" + nodeIdRecord + "'");
if (data != null) {
insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNodeId,
Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE);
insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNodeId,
Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE, channelId);
}
}

Expand Down

0 comments on commit 082feb9

Please sign in to comment.