Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0003253: ProcessInfo which is used to gather information about processes
can be corrupted on push and pull because of threading in 3.8
  • Loading branch information
chenson42 committed Sep 21, 2017
1 parent 45647bb commit 58a3295
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 105 deletions.
Expand Up @@ -25,8 +25,6 @@
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.jumpmind.util.AppUtils;

Expand Down Expand Up @@ -64,19 +62,17 @@ public String toString() {
private ProcessStatus status = ProcessStatus.NEW;

private long currentDataCount;

private long totalDataCount = 0;

private long dataCount = -1;

private long batchCount;
private long batchCount;

private long currentBatchId;

private long currentBatchCount;

private String currentChannelId;

private boolean threadPerChannel;

private String currentTableName;

private transient Thread thread;
Expand All @@ -89,14 +85,8 @@ public String toString() {

private Date lastStatusChangeTime = new Date();

private Map<ProcessStatus, ProcessInfo> statusHistory;

private Map<ProcessStatus, Date> statusStartHistory;

private Date endTime;

private long totalDataCount = 0;

public ProcessInfo() {
this(new ProcessInfoKey("", "", null));
}
Expand Down Expand Up @@ -131,20 +121,7 @@ public ProcessStatus getStatus() {
}

public void setStatus(ProcessStatus status) {
if (statusHistory == null) {
statusHistory = new HashMap<ProcessStatus, ProcessInfo>();
}
if (statusStartHistory == null) {
statusStartHistory = new HashMap<ProcessStatus, Date>();
}
if (!statusStartHistory.containsKey(this.status)) {
statusStartHistory.put(this.status, this.startTime);
}
statusHistory.put(this.status, this.copy());
statusHistory.put(status, this);

this.status = status;

this.lastStatusChangeTime = new Date();
if (status == ProcessStatus.OK || status == ProcessStatus.ERROR) {
this.endTime = new Date();
Expand Down Expand Up @@ -197,7 +174,6 @@ public long getCurrentBatchId() {
public void setCurrentBatchId(long currentBatchId) {
this.currentBatchId = currentBatchId;
this.currentBatchStartTime = new Date();
this.currentDataCount = 0;
}

public void setCurrentLoadId(long loadId) {
Expand Down Expand Up @@ -256,22 +232,6 @@ public Date getLastStatusChangeTime() {
return lastStatusChangeTime;
}

public void setDataCount(long dataCount) {
this.dataCount = dataCount;
}

public long getDataCount() {
return dataCount;
}

public boolean isThreadPerChannel() {
return threadPerChannel;
}

public void setThreadPerChannel(boolean threadPerChannel) {
this.threadPerChannel = threadPerChannel;
}

public Date getCurrentBatchStartTime() {
if (currentBatchStartTime == null) {
return startTime;
Expand All @@ -284,30 +244,6 @@ public void setCurrentBatchStartTime(Date currentBatchStartTime) {
this.currentBatchStartTime = currentBatchStartTime;
}

public Map<ProcessStatus, ProcessInfo> getStatusHistory() {
return this.statusHistory;
}

public void setStatusHistory(Map<ProcessStatus, ProcessInfo> statusHistory) {
this.statusHistory = statusHistory;
}

public void setStatusStartHistory(Map<ProcessStatus, Date> statusStartHistory) {
this.statusStartHistory = statusStartHistory;
}

public Map<ProcessStatus, Date> getStatusStartHistory() {
return this.statusStartHistory;
}

public ProcessInfo getStatusHistory(ProcessStatus status) {
return this.statusHistory == null ? null : this.statusHistory.get(status);
}

public Date getStatusStartHistory(ProcessStatus status) {
return this.statusStartHistory == null ? null : this.statusStartHistory.get(status);
}

@Override
public String toString() {
return String.format("%s,status=%s,startTime=%s", key.toString(), status.toString(), startTime.toString());
Expand Down
Expand Up @@ -20,13 +20,19 @@
*/
package org.jumpmind.symmetric.model;

import static org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus.ERROR;
import static org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus.EXTRACTING;
import static org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus.LOADING;
import static org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus.OK;
import static org.jumpmind.symmetric.model.ProcessType.PULL_HANDLER_EXTRACT;
import static org.jumpmind.symmetric.model.ProcessType.PUSH_JOB_EXTRACT;

import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.NestedDataWriter;
import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus;

public class ProcessInfoDataWriter extends NestedDataWriter {

Expand All @@ -44,6 +50,12 @@ public void open(DataContext context) {

public void start(Batch batch) {
if (batch != null) {
ProcessType type = processInfo.getProcessType();
if (type == PULL_HANDLER_EXTRACT || type == PUSH_JOB_EXTRACT) {
processInfo.setStatus(EXTRACTING);
} else {
processInfo.setStatus(LOADING);
}
processInfo.setCurrentBatchId(batch.getBatchId());
processInfo.setCurrentChannelId(batch.getChannelId());
processInfo.incrementBatchCount();
Expand All @@ -58,18 +70,18 @@ public boolean start(Table table) {
}
return super.start(table);
}

@Override
public void end(Batch batch, boolean inError) {
processInfo.setStatus(!inError ? ProcessStatus.OK : ProcessStatus.ERROR);
processInfo.setStatus(!inError ? OK : ERROR);
super.end(batch, inError);
}

public void write(CsvData data) {
if (data != null) {
processInfo.incrementCurrentDataCount();
}
super.write(data);
super.write(data);
}

}
Expand Up @@ -115,13 +115,13 @@
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.NodeCommunication;
import org.jumpmind.symmetric.model.NodeCommunication.CommunicationType;
import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchWithPayload;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus;
import org.jumpmind.symmetric.model.ProcessInfoDataWriter;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessType;
Expand Down Expand Up @@ -494,12 +494,12 @@ public List<OutgoingBatchWithPayload> extractToPayload(ProcessInfo processInfo,
return Collections.emptyList();
}

public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
public List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode,
IOutgoingTransport transport) {
return extract(processInfo, targetNode, null, transport);
return extract(extractInfo, targetNode, null, transport);
}

public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode, String queue,
public List<OutgoingBatch> extract(ProcessInfo extractInfo, Node targetNode, String queue,
IOutgoingTransport transport) {

/*
Expand All @@ -514,7 +514,7 @@ public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode, Str
if (queue != null) {
NodeGroupLinkAction defaultAction = configurationService.getNodeGroupLinkFor(nodeService.findIdentity().getNodeGroupId(),
targetNode.getNodeGroupId(), false).getDataEventAction();
ProcessType processType = processInfo.getKey().getProcessType();
ProcessType processType = extractInfo.getKey().getProcessType();
NodeGroupLinkAction action = null;

if (processType.equals(ProcessType.PUSH_JOB_EXTRACT)) {
Expand All @@ -539,7 +539,7 @@ public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode, Str
IDataWriter dataWriter = new ProtocolDataWriter(nodeService.findIdentityNodeId(),
writer, targetNode.requires13Compatiblity());

return extract(processInfo, targetNode, activeBatches, dataWriter, writer, ExtractMode.FOR_SYM_CLIENT);
return extract(extractInfo, targetNode, activeBatches, dataWriter, writer, ExtractMode.FOR_SYM_CLIENT);
}

}
Expand Down Expand Up @@ -599,19 +599,14 @@ protected List<OutgoingBatch> extract(final ProcessInfo extractInfo, final Node
extractInfo.setBatchCount(activeBatches.size());
for (int i = 0; i < activeBatches.size(); i++) {
currentBatch = activeBatches.get(i);

extractInfo.setCurrentLoadId(currentBatch.getLoadId());
extractInfo.setDataCount(currentBatch.getDataRowCount());
extractInfo.setCurrentBatchId(currentBatch.getBatchId());


channelsProcessed.add(currentBatch.getChannelId());

currentBatch = requeryIfEnoughTimeHasPassed(batchesSelectedAtMs, currentBatch);
extractInfo.setStatus(ProcessInfo.ProcessStatus.EXTRACTING);
final OutgoingBatch extractBatch = currentBatch;
Callable<FutureOutgoingBatch> callable = new Callable<FutureOutgoingBatch>() {
public FutureOutgoingBatch call() throws Exception {
return extractBatch(extractBatch, status, extractInfo, targetNode, dataWriter, mode, activeBatches);
public FutureOutgoingBatch call() throws Exception {
return extractBatch(extractBatch, status, extractInfo, targetNode, dataWriter, mode, activeBatches);
}
};

Expand All @@ -638,11 +633,11 @@ public FutureOutgoingBatch call() throws Exception {

ProcessInfo transferInfo = statisticManager.newProcessInfo(new ProcessInfoKey(nodeService.findIdentityNodeId(),
extractInfo.getQueue(), targetNode.getNodeId(), extractInfo.getProcessType() == ProcessType.PUSH_JOB_EXTRACT ? ProcessType.PUSH_JOB_TRANSFER : ProcessType.PULL_HANDLER_TRANSFER));
transferInfo.setTotalDataCount(extractInfo.getTotalDataCount());
Iterator<OutgoingBatch> activeBatchIter = activeBatches.iterator();
for (int i = 0; i < futures.size(); i++) {
Future<FutureOutgoingBatch> future = futures.get(i);
currentBatch = activeBatchIter.next();
transferInfo.setTotalDataCount(currentBatch.getExtractRowCount());
if (i == futures.size() - 1) {
extractInfo.setStatus(ProcessStatus.OK);
}
Expand Down Expand Up @@ -763,8 +758,10 @@ public FutureOutgoingBatch call() throws Exception {
}
}

protected FutureOutgoingBatch extractBatch(OutgoingBatch extractBatch, FutureExtractStatus status, ProcessInfo processInfo,
protected FutureOutgoingBatch extractBatch(OutgoingBatch extractBatch, FutureExtractStatus status, ProcessInfo extractInfo,
Node targetNode, IDataWriter dataWriter, ExtractMode mode, List<OutgoingBatch> activeBatches) throws Exception {
extractInfo.setCurrentLoadId(extractBatch.getLoadId());
extractInfo.setTotalDataCount(extractBatch.getDataRowCount());
FutureOutgoingBatch outgoingBatch = new FutureOutgoingBatch(extractBatch, false);
final long maxBytesToSync = parameterService.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC);
final boolean streamToFileEnabled = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED);
Expand Down Expand Up @@ -796,7 +793,7 @@ protected FutureOutgoingBatch extractBatch(OutgoingBatch extractBatch, FutureExt
try {
boolean isRetry = isRetry(extractBatch, targetNode);
outgoingBatch = new FutureOutgoingBatch(
extractOutgoingBatch(processInfo, targetNode, dataWriter, extractBatch, streamToFileEnabled, true, mode),
extractOutgoingBatch(extractInfo, targetNode, dataWriter, extractBatch, streamToFileEnabled, true, mode),
isRetry);
status.batchExtractCount++;
status.byteExtractCount += extractBatch.getByteCount();
Expand Down Expand Up @@ -867,15 +864,15 @@ final protected OutgoingBatch requeryIfEnoughTimeHasPassed(long ts, OutgoingBatc
return currentBatch;
}

protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targetNode,
protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targetNode,
IDataWriter dataWriter, OutgoingBatch currentBatch, boolean useStagingDataWriter,
boolean updateBatchStatistics, ExtractMode mode) {

if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode || ExtractMode.FOR_SYM_CLIENT == mode) {

Node sourceNode = nodeService.findIdentity();

IDataWriter writer = wrapWithTransformWriter(sourceNode, targetNode, processInfo, dataWriter, useStagingDataWriter);
IDataWriter writer = wrapWithTransformWriter(sourceNode, targetNode, extractInfo, dataWriter, useStagingDataWriter);

long ts = System.currentTimeMillis();
long extractTimeInMs = 0l;
Expand Down Expand Up @@ -911,7 +908,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
if (updateBatchStatistics) {
changeBatchStatus(Status.QY, currentBatch, mode);
}
currentBatch.resetStats();

DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE_ID, targetNode.getNodeId());
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE_EXTERNAL_ID, targetNode.getExternalId());
Expand All @@ -922,7 +919,10 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE_EXTERNAL_ID, sourceNode.getExternalId());
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE_GROUP_ID, sourceNode.getNodeGroupId());

IDataReader dataReader = buildExtractDataReader(sourceNode, targetNode, currentBatch, processInfo);
extractInfo.setTotalDataCount(currentBatch.getDataRowCount());
currentBatch.resetStats();

IDataReader dataReader = buildExtractDataReader(sourceNode, targetNode, currentBatch, extractInfo);
new DataProcessor(dataReader, writer, "extract").process(ctx);
extractTimeInMs = System.currentTimeMillis() - ts;
Statistics stats = getExtractStats(writer);
Expand Down Expand Up @@ -989,7 +989,6 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
}

}
processInfo.incrementCurrentBatchCount();
return currentBatch;
}

Expand Down
Expand Up @@ -425,9 +425,7 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
triggerHistories.addAll(engine.getTriggerRouterService()
.getActiveTriggerHistories(new Trigger(reloadRequest.getTriggerId(), null)));
}
}

processInfo.setDataCount(triggerHistories.size());
}

Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId = triggerRouterService
.fillTriggerRoutersByHistIdAndSortHist(sourceNode.getNodeGroupId(),
Expand All @@ -453,8 +451,7 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
}
} catch (Exception e) {
}

processInfo.setDataCount(totalTableCount);
processInfo.setTotalDataCount(totalTableCount);

if (isFullLoad || (reloadRequests != null && reloadRequests.size() > 0)) {
insertSqlEventsPriorToReload(targetNode, nodeIdRecord, loadId, createBy,
Expand Down
Expand Up @@ -109,18 +109,15 @@ protected void init() {
public ProcessInfo newProcessInfo(ProcessInfoKey key) {
ProcessInfo process = new ProcessInfo(key);
ProcessInfo old = processInfos.get(key);
if (old != null) {
process.setStatusHistory(old.getStatusHistory());
process.setStatusStartHistory(old.getStatusStartHistory());

if (old != null) {
if (old.getStatus() != ProcessStatus.OK && old.getStatus() != ProcessStatus.ERROR) {
log.warn(
"Starting a new process even though the previous '{}' process had not finished",
old.getProcessType().toString());
log.info("Details from the previous process: {}", old.toString());
}

if (old.getCurrentDataCount() > 0 || old.getDataCount() > 0) {
if (old.getCurrentDataCount() > 0 || old.getTotalDataCount() > 0) {
processInfosThatHaveDoneWork.put(key, old);
}
}
Expand Down
Expand Up @@ -97,13 +97,13 @@ public void handleWithCompression(HttpServletRequest req, HttpServletResponse re
map.setChannelQueue(req.getHeader(WebConstants.CHANNEL_QUEUE));

// pull out headers and pass to pull() method
pull(nodeId, req.getRemoteHost(), req.getRemoteAddr(), res.getOutputStream(), req.getHeader(WebConstants.HEADER_ACCEPT_CHARSET), res, map);
handlePull(nodeId, req.getRemoteHost(), req.getRemoteAddr(), res.getOutputStream(), req.getHeader(WebConstants.HEADER_ACCEPT_CHARSET), res, map);

log.debug("Done with Pull request from {}", nodeId);

}

public void pull(String nodeId, String remoteHost, String remoteAddress,
protected void handlePull(String nodeId, String remoteHost, String remoteAddress,
OutputStream outputStream, String encoding, HttpServletResponse res, ChannelMap map) throws IOException {
NodeSecurity nodeSecurity = nodeService.findNodeSecurity(nodeId, true);
long ts = System.currentTimeMillis();
Expand Down

0 comments on commit 58a3295

Please sign in to comment.