Skip to content

Commit

Permalink
0003142: Sync Columns Between Incoming and Outgoing Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
maxwellpettit committed Jun 26, 2017
1 parent 5cbc027 commit 5cfbd98
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 29 deletions.
Expand Up @@ -53,27 +53,7 @@ public IncomingBatch(Batch batch) {
public void setValues(Statistics readerStatistics, Statistics writerStatistics, boolean isSuccess) {
if (readerStatistics != null) {
setByteCount(readerStatistics.get(DataReaderStatistics.READ_BYTE_COUNT));
setLoadFlag(readerStatistics.get(DataReaderStatistics.LOAD_FLAG) == 1);
setExtractCount(readerStatistics.get(DataReaderStatistics.EXTRACT_COUNT));
setSentCount(readerStatistics.get(DataReaderStatistics.SENT_COUNT));
setLoadCount(readerStatistics.get(DataReaderStatistics.LOAD_COUNT));
setLoadId(readerStatistics.get(DataReaderStatistics.LOAD_ID));
setCommonFlag(readerStatistics.get(DataReaderStatistics.COMMON_FLAG) == 1);
setRouterMillis(readerStatistics.get(DataReaderStatistics.ROUTER_MILLIS));
setExtractCount(readerStatistics.get(DataReaderStatistics.EXTRACT_MILLIS));
setTransformExtractMillis(readerStatistics.get(DataReaderStatistics.TRANSFORM_EXTRACT_MILLIS));
setTransformLoadMillis(readerStatistics.get(DataReaderStatistics.TRANSFORM_LOAD_MILLIS));
setReloadRowCount(readerStatistics.get(DataReaderStatistics.RELOAD_ROW_COUNT));
setOtherRowCount(readerStatistics.get(DataReaderStatistics.OTHER_ROW_COUNT));
setDataRowCount(readerStatistics.get(DataReaderStatistics.DATA_ROW_COUNT));
setDataInsertRowCount(readerStatistics.get(DataReaderStatistics.DATA_INSERT_ROW_COUNT));
setDataUpdateRowCount(readerStatistics.get(DataReaderStatistics.DATA_UPDATE_ROW_COUNT));
setDataDeleteRowCount(readerStatistics.get(DataReaderStatistics.DATA_DELETE_ROW_COUNT));
setExtractRowCount(readerStatistics.get(DataReaderStatistics.EXTRACT_ROW_COUNT));
setExtractInsertRowCount(readerStatistics.get(DataReaderStatistics.EXTRACT_INSERT_ROW_COUNT));
setExtractUpdateRowCount(readerStatistics.get(DataReaderStatistics.EXTRACT_UPDATE_ROW_COUNT));
setExtractDeleteRowCount(readerStatistics.get(DataReaderStatistics.EXTRACT_DELETE_ROW_COUNT));
setFailedDataId(readerStatistics.get(DataReaderStatistics.FAILED_DATA_ID));
setParsedStatistics(readerStatistics);
}
if (writerStatistics != null) {
setFilterMillis(writerStatistics.get(DataWriterStatisticConstants.FILTERMILLIS));
Expand All @@ -93,6 +73,30 @@ public void setValues(Statistics readerStatistics, Statistics writerStatistics,
}
}

public void setParsedStatistics(Statistics statistics) {
setLoadFlag(statistics.get(DataReaderStatistics.LOAD_FLAG) == 1);
setExtractCount(statistics.get(DataReaderStatistics.EXTRACT_COUNT));
setSentCount(statistics.get(DataReaderStatistics.SENT_COUNT));
setLoadCount(statistics.get(DataReaderStatistics.LOAD_COUNT));
setLoadId(statistics.get(DataReaderStatistics.LOAD_ID));
setCommonFlag(statistics.get(DataReaderStatistics.COMMON_FLAG) == 1);
setRouterMillis(statistics.get(DataReaderStatistics.ROUTER_MILLIS));
setExtractCount(statistics.get(DataReaderStatistics.EXTRACT_MILLIS));
setTransformExtractMillis(statistics.get(DataReaderStatistics.TRANSFORM_EXTRACT_MILLIS));
setTransformLoadMillis(statistics.get(DataReaderStatistics.TRANSFORM_LOAD_MILLIS));
setReloadRowCount(statistics.get(DataReaderStatistics.RELOAD_ROW_COUNT));
setOtherRowCount(statistics.get(DataReaderStatistics.OTHER_ROW_COUNT));
setDataRowCount(statistics.get(DataReaderStatistics.DATA_ROW_COUNT));
setDataInsertRowCount(statistics.get(DataReaderStatistics.DATA_INSERT_ROW_COUNT));
setDataUpdateRowCount(statistics.get(DataReaderStatistics.DATA_UPDATE_ROW_COUNT));
setDataDeleteRowCount(statistics.get(DataReaderStatistics.DATA_DELETE_ROW_COUNT));
setExtractRowCount(statistics.get(DataReaderStatistics.EXTRACT_ROW_COUNT));
setExtractInsertRowCount(statistics.get(DataReaderStatistics.EXTRACT_INSERT_ROW_COUNT));
setExtractUpdateRowCount(statistics.get(DataReaderStatistics.EXTRACT_UPDATE_ROW_COUNT));
setExtractDeleteRowCount(statistics.get(DataReaderStatistics.EXTRACT_DELETE_ROW_COUNT));
setFailedDataId(statistics.get(DataReaderStatistics.FAILED_DATA_ID));
}

public void setNodeBatchId(String value) {
if (value != null) {
int splitIndex = value.indexOf("-");
Expand Down
Expand Up @@ -1180,7 +1180,7 @@ protected void transferFromStaging(ExtractMode mode, BatchType batchType, Outgoi
writer.write(getBatchStats(batch));
writer.newLine();
}

writer.write(CsvConstants.RETRY + "," + batch.getBatchId());
writer.newLine();
writer.write(CsvConstants.COMMIT + "," + batch.getBatchId());
Expand Down Expand Up @@ -1306,15 +1306,16 @@ protected boolean writeBatchStats(BufferedWriter writer, char[] buffer, int buff
throws IOException {
String bufferString = new String(buffer);
int index = findStatsIndex(bufferString, prevBuffer);

if (index > 0) {
char prefix[] = Arrays.copyOf(buffer, index);
writer.write(prefix, 0, index);
}
if (index > -1) {
String stats = getBatchStatsColumns() + System.lineSeparator() + getBatchStats(batch) + System.lineSeparator();

char statsBuffer[] = stats.toCharArray();
writer.write(statsBuffer, 0, stats.length());
writer.write(statsBuffer, 0, statsBuffer.length);

char suffix[] = Arrays.copyOfRange(buffer, index, buffer.length);
writer.write(suffix, 0, bufferSize - index);
Expand Down
Expand Up @@ -952,6 +952,8 @@ public IncomingBatch call() throws Exception {
IncomingBatch incomingBatch = null;
if (!isError && resource != null && resource.exists()) {
try {
//TODO: PRINT
System.out.println("loadBatchFromStage: " + batch.getNodeBatchId());
processInfo.setStatus(ProcessInfo.ProcessStatus.LOADING);

ProtocolDataReader reader = new ProtocolDataReader(BatchType.LOAD, batch.getTargetNodeId(), resource) {
Expand Down Expand Up @@ -983,6 +985,7 @@ protected IDataWriter chooseDataWriter(Batch batch) {
incomingBatch.incrementIgnoreCount();
}
}

resource.setState(State.DONE);
if (!resource.isFileResource()) {
resource.delete();
Expand Down Expand Up @@ -1085,6 +1088,11 @@ public void batchSuccessful(DataContext context) {
statisticManager.incrementDataBytesLoaded(this.currentBatch.getChannelId(),
this.currentBatch.getByteCount());
Status oldStatus = this.currentBatch.getStatus();
// TODO: PRINT
if (batch.getStatistics() != null) {
System.out.println("setParsedStatistics: " + batch.getNodeBatchId());
currentBatch.setParsedStatistics(batch.getStatistics());
}
try {
this.currentBatch.setStatus(Status.OK);
if (incomingBatchService.isRecordOkBatchesEnabled()) {
Expand Down
Expand Up @@ -318,6 +318,8 @@ public int updateIncomingBatch(IncomingBatch batch) {

public int updateIncomingBatch(ISqlTransaction transaction, IncomingBatch batch) {
int count = 0;
//TODO: PRINT
System.out.println("updateIncomingBatch: " + batch.getNodeBatchId());
if (batch.isPersistable()) {
if (batch.getStatus() == IncomingBatch.Status.ER) {
batch.setErrorFlag(true);
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Hex;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.util.Statistics;

public class Batch {

Expand All @@ -54,6 +55,8 @@ public enum BatchType { EXTRACT, LOAD };

protected Map<String, Long> timers = new HashMap<String, Long>();

protected Statistics statistics;

public Batch(BatchType batchType, long batchId, String channelId, BinaryEncoding binaryEncoding, String sourceNodeId, String targetNodeId, boolean common) {
this.batchType = batchType;
this.batchId = batchId;
Expand Down Expand Up @@ -229,4 +232,12 @@ public byte[] decodeBinary(String value) {
return null;
}

public void setStatistics(Statistics statistics) {
this.statistics = statistics;
}

public Statistics getStatistics() {
return statistics;
}

}
Expand Up @@ -141,8 +141,8 @@ public Object readNext() {
tokens = csvReader.getValues();
}
bytesRead += logDebugAndCountBytes(tokens);
Statistics stats = null;

Statistics stats = null;
if (batch != null) {
stats = statistics.get(batch);
stats.increment(DataReaderStatistics.READ_BYTE_COUNT, bytesRead);
Expand Down
Expand Up @@ -85,7 +85,6 @@ public void process() throws IOException {

while (reader.readRecord()) {
line = reader.getRawRecord();
System.out.println("line = " + line);
if (line.startsWith(CsvConstants.CATALOG)) {
catalogLine = line;
writeLine(line);
Expand Down Expand Up @@ -158,6 +157,7 @@ public void process() throws IOException {
} else if (line.startsWith(CsvConstants.RETRY)) {
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine),
getArgLine(nodeLine), targetNodeId, false);
batch.setStatistics(batchStats);
String location = batch.getStagedLocation();
resource = stagingManager.find(category, location,
batch.getBatchId());
Expand All @@ -179,9 +179,16 @@ public void process() throws IOException {
channelLine = line;
} else if (line.startsWith(CsvConstants.STATS_COLUMNS)) {
batchStatsColumnsLine = line;
if (writer != null) {
writeLine(line);
}
} else if (line.startsWith(CsvConstants.STATS)) {
batchStatsLine = line;
putStats(batchStats, batchStatsColumnsLine, batchStatsLine);
if (writer != null) {
writeLine(line);
} else {
putStats(batchStats, batchStatsColumnsLine, batchStatsLine);
}
} else {
if (writer == null) {
throw new IllegalStateException("Invalid batch data was received: " + line);
Expand Down Expand Up @@ -261,7 +268,7 @@ protected void putStats(Statistics stats, String columnsString, String statsStri
String statsValues[] = StringUtils.split(statsString, ',');

if (statsValues != null && statsColumns != null) {
for (int i = 0; i < statsColumns.length; i++) {
for (int i = 1; i < statsColumns.length; i++) {
String column = statsColumns[i];
if (i < statsValues.length) {
long stat = Long.parseLong(statsValues[i]);
Expand Down

0 comments on commit 5cfbd98

Please sign in to comment.