Skip to content

Commit

Permalink
0003142: Sync Columns Between Incoming and Outgoing Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
maxwellpettit committed Jul 11, 2017
1 parent 1ff79dc commit 8f5004c
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public IncomingBatch(Batch batch) {
public void setValues(Statistics readerStatistics, Statistics writerStatistics, boolean isSuccess) {
if (readerStatistics != null) {
setByteCount(readerStatistics.get(DataReaderStatistics.READ_BYTE_COUNT));
setParsedStatistics(readerStatistics);
mergeInjectedBatchStatistics(readerStatistics);
}
if (writerStatistics != null) {
setFilterMillis(writerStatistics.get(DataWriterStatisticConstants.FILTERMILLIS));
Expand All @@ -77,7 +77,7 @@ public void setValues(Statistics readerStatistics, Statistics writerStatistics,
}
}

public void setParsedStatistics(Statistics statistics) {
public void mergeInjectedBatchStatistics(Statistics statistics) {
setLoadFlag(statistics.get(DataReaderStatistics.LOAD_FLAG) == 1);
setExtractCount(statistics.get(DataReaderStatistics.EXTRACT_COUNT));
setSentCount(statistics.get(DataReaderStatistics.SENT_COUNT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,8 +907,7 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe
extractTimeInMs = System.currentTimeMillis() - ts;
Statistics stats = getExtractStats(writer);
if (stats != null) {
transformTimeInMs = stats.get(DataWriterStatisticConstants.TRANSFORMMILLIS);
currentBatch.setExtractCount(stats.get(DataWriterStatisticConstants.ROWCOUNT));
transformTimeInMs = stats.get(DataWriterStatisticConstants.TRANSFORMMILLIS);
currentBatch.setDataRowCount(stats.get(DataWriterStatisticConstants.ROWCOUNT));
currentBatch.setDataInsertRowCount(stats.get(DataWriterStatisticConstants.INSERTCOUNT));
currentBatch.setDataUpdateRowCount(stats.get(DataWriterStatisticConstants.UPDATECOUNT));
Expand Down Expand Up @@ -1153,15 +1152,11 @@ protected void transferFromStaging(ExtractMode mode, BatchType batchType, Outgoi
while ((line = reader.readLine()) != null) {
if (line.startsWith(CsvConstants.BATCH)) {
if (nodeService.findNode(batch.getNodeId()).isVersionGreaterThanOrEqualTo(3, 9, 0)) {
reader.mark(0);
String nextLine = reader.readLine();
reader.reset();
if (nextLine != null && !nextLine.startsWith(CsvConstants.STATS_COLUMNS)) {
writer.write(getBatchStatsColumns());
writer.newLine();
writer.write(getBatchStats(batch));
writer.newLine();
}
writer.write(getBatchStatsColumns());
writer.newLine();
writer.write(getBatchStats(batch));
writer.newLine();

}
writer.write(CsvConstants.RETRY + "," + batch.getBatchId());
writer.newLine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ public void batchSuccessful(DataContext context) {
Status oldStatus = this.currentBatch.getStatus();

if (context.getStatistics() != null) {
currentBatch.setParsedStatistics(context.getStatistics());
currentBatch.mergeInjectedBatchStatistics(context.getStatistics());
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ public Object readNext() {
return table;
}

// TODO: Increment load stats?
if (stats != null && (tokens[0].equals(CsvConstants.INSERT) || tokens[0].equals(CsvConstants.UPDATE)
|| tokens[0].equals(CsvConstants.DELETE))) {
stats.increment(DataReaderStatistics.READ_RECORD_COUNT, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ public void process() throws IOException {
writer = null;
}
batchTableLines.clear();


context.setStatistics(batchStats);
if (listeners != null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.end(context, batch, resource);
Expand All @@ -154,7 +155,6 @@ public void process() throws IOException {
} else if (line.startsWith(CsvConstants.RETRY)) {
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine),
getArgLine(nodeLine), targetNodeId, false);
context.setStatistics(batchStats);
String location = batch.getStagedLocation();
resource = stagingManager.find(category, location, batch.getBatchId());
if (resource == null || resource.getState() == State.CREATE) {
Expand All @@ -175,16 +175,9 @@ public void process() throws IOException {
channelLine = line;
} else if (line.startsWith(CsvConstants.STATS_COLUMNS)) {
batchStatsColumnsLine = line;
if (writer != null) {
writeLine(line);
}
} else if (line.startsWith(CsvConstants.STATS)) {
batchStatsLine = line;
if (writer != null) {
writeLine(line);
} else {
putStats(batchStats, batchStatsColumnsLine, batchStatsLine);
}
putStats(batchStats, batchStatsColumnsLine, batchStatsLine);
} else {
if (writer == null) {
throw new IllegalStateException("Invalid batch data was received: " + line);
Expand Down

0 comments on commit 8f5004c

Please sign in to comment.