Skip to content

Commit

Permalink
0003253: ProcessInfo which is used to gather information about processes
Browse files Browse the repository at this point in the history
can be corrupted on push and pull because of threading in 3.8
  • Loading branch information
chenson42 committed Sep 21, 2017
1 parent 437ee69 commit a146f60
Show file tree
Hide file tree
Showing 21 changed files with 166 additions and 127 deletions.
Expand Up @@ -18,7 +18,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.io.data.writer;
package org.jumpmind.symmetric.io.stage;

import java.io.BufferedReader;
import java.io.BufferedWriter;
Expand All @@ -31,12 +31,15 @@
import org.jumpmind.symmetric.csv.CsvReader;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.data.writer.IProtocolDataWriterListener;
import org.jumpmind.symmetric.io.data.CsvConstants;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagedResource.State;
import org.jumpmind.util.Statistics;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -54,11 +57,11 @@ public class SimpleStagingDataWriter {
protected BatchType batchType;
protected String targetNodeId;
protected DataContext context;

protected ProcessInfo processInfo;
protected BufferedWriter writer;
protected Batch batch;

public SimpleStagingDataWriter(BufferedReader reader, IStagingManager stagingManager, String category, long memoryThresholdInBytes,
public SimpleStagingDataWriter(ProcessInfo processInfo, BufferedReader reader, IStagingManager stagingManager, String category, long memoryThresholdInBytes,
BatchType batchType, String targetNodeId, DataContext context, IProtocolDataWriterListener... listeners) {
this.reader = new CsvReader(reader);
this.reader.setEscapeMode(CsvReader.ESCAPE_MODE_BACKSLASH);
Expand All @@ -70,6 +73,7 @@ public SimpleStagingDataWriter(BufferedReader reader, IStagingManager stagingMan
this.targetNodeId = targetNodeId;
this.listeners = listeners;
this.context = context;
this.processInfo = processInfo;
}

public void process() throws IOException {
Expand All @@ -96,7 +100,6 @@ public void process() throws IOException {
} else if (line.startsWith(CsvConstants.TABLE)) {
tableLine = new TableLine(catalogLine, schemaLine, line);
TableLine batchTableLine = batchTableLines.get(tableLine);

if (batchTableLine != null) {
tableLine = batchTableLine;
writeLine(line);
Expand Down Expand Up @@ -124,6 +127,7 @@ public void process() throws IOException {
} else if (line.startsWith(CsvConstants.BATCH)) {
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine),
getArgLine(nodeLine), targetNodeId, false);
processInfo.incrementBatchCount();
String location = batch.getStagedLocation();
resource = stagingManager.create(category, location, batch.getBatchId());
writer = resource.getWriter(memoryThresholdInBytes);
Expand All @@ -136,7 +140,7 @@ public void process() throws IOException {
for (IProtocolDataWriterListener listener : listeners) {
listener.start(context, batch);
}
}
}
} else if (line.startsWith(CsvConstants.COMMIT)) {
if (writer != null) {
writeLine(line);
Expand All @@ -157,6 +161,7 @@ public void process() throws IOException {
} else if (line.startsWith(CsvConstants.RETRY)) {
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine),
getArgLine(nodeLine), targetNodeId, false);
processInfo.incrementBatchCount();
String location = batch.getStagedLocation();
resource = stagingManager.find(category, location, batch.getBatchId());
if (resource == null || resource.getState() == State.CREATE) {
Expand Down Expand Up @@ -204,6 +209,13 @@ public void process() throws IOException {
writeLine(syncLine.columnsLine);
}
}

if (line.startsWith(CsvConstants.INSERT) || line.startsWith(CsvConstants.DELETE) || line.startsWith(CsvConstants.UPDATE)
|| line.startsWith(CsvConstants.CREATE) || line.startsWith(CsvConstants.SQL)
|| line.startsWith(CsvConstants.BSH)) {
processInfo.incrementCurrentDataCount();
}

int size = line.length();
if (size > MAX_WRITE_LENGTH) {
log.debug("Exceeded max line length with {}", size);
Expand All @@ -227,11 +239,15 @@ public void process() throws IOException {
ts = System.currentTimeMillis();
}
}

processInfo.setStatus(ProcessStatus.OK);
} catch (IOException ex) {
if (resource != null) {
resource.delete();
}

processInfo.setStatus(ProcessStatus.ERROR);

/*
* Just log an error here. We want batches that come before us to continue to process and to be acknowledged
*/
Expand Down
Expand Up @@ -208,11 +208,12 @@ public long getCurrentLoadId() {
return currentLoadId;
}

public String getCurrentChannelThread() {
if (getKey().getChannelId() != null && getKey().getChannelId().length() > 0) {
return getKey().getChannelId();
public String getQueue() {
String queue = key.getQueue();
if (queue == null) {
queue = "";
}
return "";
return queue;
}

public String getCurrentChannelId() {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.NestedDataWriter;
import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus;

public class ProcessInfoDataWriter extends NestedDataWriter {

Expand Down Expand Up @@ -57,6 +58,12 @@ public boolean start(Table table) {
}
return super.start(table);
}

@Override
public void end(Batch batch, boolean inError) {
processInfo.setStatus(!inError ? ProcessStatus.OK : ProcessStatus.ERROR);
super.end(batch, inError);
}

public void write(CsvData data) {
if (data != null) {
Expand Down
Expand Up @@ -32,20 +32,20 @@ public class ProcessInfoKey implements Serializable {

private ProcessType processType;

private String channelId;
private String queue;

public ProcessInfoKey(String sourceNodeId, String targetNodeId, ProcessType processType) {
this.sourceNodeId = sourceNodeId;
this.targetNodeId = targetNodeId;
this.processType = processType;
this.channelId = null;
this.queue = null;
}

public ProcessInfoKey(String sourceNodeId, String channelId, String targetNodeId, ProcessType processType) {
public ProcessInfoKey(String sourceNodeId, String queue, String targetNodeId, ProcessType processType) {
this.sourceNodeId = sourceNodeId;
this.targetNodeId = targetNodeId;
this.processType = processType;
this.channelId = channelId;
this.queue = queue;
}

public String getSourceNodeId() {
Expand All @@ -60,8 +60,8 @@ public ProcessType getProcessType() {
return processType;
}

public String getChannelId() {
return channelId;
public String getQueue() {
return queue;
}

@Override
Expand All @@ -71,7 +71,7 @@ public int hashCode() {
result = prime * result + ((processType == null) ? 0 : processType.hashCode());
result = prime * result + ((sourceNodeId == null) ? 0 : sourceNodeId.hashCode());
result = prime * result + ((targetNodeId == null) ? 0 : targetNodeId.hashCode());
result = prime * result + ((channelId == null) ? 0 : channelId.hashCode());
result = prime * result + ((queue == null) ? 0 : queue.hashCode());
return result;
}

Expand All @@ -96,18 +96,18 @@ public boolean equals(Object obj) {
return false;
} else if (!targetNodeId.equals(other.targetNodeId))
return false;
if (channelId == null) {
if (other.channelId != null)
if (queue == null) {
if (other.queue != null)
return false;
} else if (!channelId.equals(other.channelId))
} else if (!queue.equals(other.queue))
return false;
return true;
}

@Override
public String toString() {
return String.format("processType=%s,sourceNodeId=%s,targetNodeId=%s,channelId=%s", processType.toString(), sourceNodeId,
targetNodeId, channelId);
return String.format("processType=%s,sourceNodeId=%s,targetNodeId=%s,queue=%s", processType.toString(), sourceNodeId,
targetNodeId, queue);
}

}
Expand Up @@ -37,7 +37,7 @@ public static enum Status {
};

private String nodeId;
private String channelId;
private String queue;
private Status status;
private long dataProcessed;
private long batchesProcessed;
Expand All @@ -49,7 +49,7 @@ public RemoteNodeStatus(String nodeId, String channelId, Map<String, Channel> ch
this.status = Status.NO_DATA;
this.nodeId = nodeId;
this.channels = channels;
this.channelId = channelId;
this.queue = channelId;
}

public boolean failed() {
Expand All @@ -64,12 +64,12 @@ public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}

public String getChannelId() {
return channelId;
public String getQueue() {
return queue;
}

public void setChannelId(String channelId) {
this.channelId = channelId;
public void setQueue(String queue) {
this.queue = queue;
}

public Status getStatus() {
Expand Down
Expand Up @@ -68,10 +68,10 @@ public boolean errorOccurred() {
return errorOccurred;
}

public RemoteNodeStatus add(String nodeId, String channelId) {
public RemoteNodeStatus add(String nodeId, String queue) {
RemoteNodeStatus status = null;
if (nodeId != null) {
status = new RemoteNodeStatus(nodeId, channelId, channels);
status = new RemoteNodeStatus(nodeId, queue, channels);
add(status);
}
return status;
Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
Expand Down
Expand Up @@ -42,7 +42,7 @@
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.service.IContextService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
Expand Down
Expand Up @@ -48,7 +48,7 @@
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.model.ProcessType;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.util.AppUtils;
import org.jumpmind.util.FormatUtils;
Expand Down

0 comments on commit a146f60

Please sign in to comment.