Skip to content

Commit

Permalink
0003142: Sync Columns Between Incoming and Outgoing Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
maxwellpettit committed Jun 23, 2017
1 parent ece6a3e commit 5cbc027
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 36 deletions.
Expand Up @@ -131,7 +131,8 @@ public Object readNext() {
String catalogName = null;
String[] parsedOldData = null;
long bytesRead = 0;
IncomingBatchStatistics incomingBatchStats = null;
String[] statsColumns = null;
String[] statsValues = null;
Table table = null;
while (tokens != null || csvReader.readRecord()) {
lineNumber++;
Expand Down Expand Up @@ -278,14 +279,12 @@ public Object readNext() {
batch.setIgnored(true);
}
} else if (tokens[0].equals(CsvConstants.STATS_COLUMNS)) {
incomingBatchStats = new IncomingBatchStatistics();
incomingBatchStats.setColumns(CollectionUtils.copyOfRange(tokens, 1, tokens.length));
statsColumns = CollectionUtils.copyOfRange(tokens, 1, tokens.length);
} else if (tokens[0].equals(CsvConstants.STATS)) {
incomingBatchStats.setStats(CollectionUtils.copyOfRange(tokens, 1, tokens.length));
incomingBatchStats.putStats(stats);
statsValues = CollectionUtils.copyOfRange(tokens, 1, tokens.length);
putStats(stats, statsColumns, statsValues);
} else {
log.info("Unable to handle unknown csv values: " + Arrays.toString(tokens));

}

tokens = null;
Expand Down Expand Up @@ -368,38 +367,16 @@ public void close() {
public Map<Batch, Statistics> getStatistics() {
return statistics;
}

class IncomingBatchStatistics {
private String[] columns;
private String[] stats;

public String[] getColumns() {
return columns;
}

public void setColumns(String[] columns) {
this.columns = columns;
}

public String[] getStats() {
return stats;
}

public void setStats(String[] stats) {
this.stats = stats;
}

public void putStats(Statistics statistics) {
if (stats != null && columns != null) {
for (int i = 0; i < columns.length; i++) {
String column = columns[i];
if (i < stats.length) {
long stat = Long.parseLong(stats[i]);
statistics.set(column, stat);
}

protected void putStats(Statistics stats, String[] statsColumns, String[] statsValues) {
if (statsValues != null && statsColumns != null) {
for (int i = 0; i < statsColumns.length; i++) {
String column = statsColumns[i];
if (i < statsValues.length) {
long stat = Long.parseLong(statsValues[i]);
stats.set(column, stat);
}
}
}
}

}
Expand Up @@ -35,6 +35,7 @@
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagedResource.State;
import org.jumpmind.util.Statistics;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -80,6 +81,7 @@ public void process() throws IOException {
String line = null;
long startTime = System.currentTimeMillis(), ts = startTime, lineCount = 0;
String batchStatsColumnsLine = null; String batchStatsLine = null;
Statistics batchStats = new Statistics();

while (reader.readRecord()) {
line = reader.getRawRecord();
Expand Down Expand Up @@ -179,6 +181,7 @@ public void process() throws IOException {
batchStatsColumnsLine = line;
} else if (line.startsWith(CsvConstants.STATS)) {
batchStatsLine = line;
putStats(batchStats, batchStatsColumnsLine, batchStatsLine);
} else {
if (writer == null) {
throw new IllegalStateException("Invalid batch data was received: " + line);
Expand Down Expand Up @@ -252,6 +255,21 @@ protected void writeLine(String line) throws IOException {
writer.write("\n");
}
}

protected void putStats(Statistics stats, String columnsString, String statsString) {
String statsColumns[] = StringUtils.split(columnsString, ',');
String statsValues[] = StringUtils.split(statsString, ',');

if (statsValues != null && statsColumns != null) {
for (int i = 0; i < statsColumns.length; i++) {
String column = statsColumns[i];
if (i < statsValues.length) {
long stat = Long.parseLong(statsValues[i]);
stats.set(column, stat);
}
}
}
}

class TableLine {
String catalogLine;
Expand Down

0 comments on commit 5cbc027

Please sign in to comment.