From 5cbc027b618d47c95334a7720fb11d09e0f427ab Mon Sep 17 00:00:00 2001 From: maxwellpettit Date: Fri, 23 Jun 2017 13:59:34 -0400 Subject: [PATCH] 0003142: Sync Columns Between Incoming and Outgoing Batch --- .../io/data/reader/ProtocolDataReader.java | 49 +++++-------------- .../data/writer/SimpleStagingDataWriter.java | 18 +++++++ 2 files changed, 31 insertions(+), 36 deletions(-) diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ProtocolDataReader.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ProtocolDataReader.java index 9ef93d6229..c1558cb9a5 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ProtocolDataReader.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ProtocolDataReader.java @@ -131,7 +131,8 @@ public Object readNext() { String catalogName = null; String[] parsedOldData = null; long bytesRead = 0; - IncomingBatchStatistics incomingBatchStats = null; + String[] statsColumns = null; + String[] statsValues = null; Table table = null; while (tokens != null || csvReader.readRecord()) { lineNumber++; @@ -278,14 +279,12 @@ public Object readNext() { batch.setIgnored(true); } } else if (tokens[0].equals(CsvConstants.STATS_COLUMNS)) { - incomingBatchStats = new IncomingBatchStatistics(); - incomingBatchStats.setColumns(CollectionUtils.copyOfRange(tokens, 1, tokens.length)); + statsColumns = CollectionUtils.copyOfRange(tokens, 1, tokens.length); } else if (tokens[0].equals(CsvConstants.STATS)) { - incomingBatchStats.setStats(CollectionUtils.copyOfRange(tokens, 1, tokens.length)); - incomingBatchStats.putStats(stats); + statsValues = CollectionUtils.copyOfRange(tokens, 1, tokens.length); + putStats(stats, statsColumns, statsValues); } else { log.info("Unable to handle unknown csv values: " + Arrays.toString(tokens)); - } tokens = null; @@ -368,38 +367,16 @@ public void close() { public Map getStatistics() { return statistics; } - - class IncomingBatchStatistics { - private String[] columns; - private String[] stats; - - public String[] getColumns() { - return columns; - } - - public void setColumns(String[] columns) { - this.columns = columns; - } - - public String[] getStats() { - return stats; - } - - public void setStats(String[] stats) { - this.stats = stats; - } - - public void putStats(Statistics statistics) { - if (stats != null && columns != null) { - for (int i = 0; i < columns.length; i++) { - String column = columns[i]; - if (i < stats.length) { - long stat = Long.parseLong(stats[i]); - statistics.set(column, stat); - } + + protected void putStats(Statistics stats, String[] statsColumns, String[] statsValues) { + if (statsValues != null && statsColumns != null) { + for (int i = 0; i < statsColumns.length; i++) { + String column = statsColumns[i]; + if (i < statsValues.length) { + long stat = Long.parseLong(statsValues[i]); + stats.set(column, stat); } } } } - } diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/SimpleStagingDataWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/SimpleStagingDataWriter.java index ce3dd0ce37..a71848f98e 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/SimpleStagingDataWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/SimpleStagingDataWriter.java @@ -35,6 +35,7 @@ import org.jumpmind.symmetric.io.data.DataContext; import org.jumpmind.symmetric.io.stage.IStagedResource; import org.jumpmind.symmetric.io.stage.IStagedResource.State; +import org.jumpmind.util.Statistics; import org.jumpmind.symmetric.io.stage.IStagingManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +81,7 @@ public void process() throws IOException { String line = null; long startTime = System.currentTimeMillis(), ts = startTime, lineCount = 0; String batchStatsColumnsLine = null; String batchStatsLine = null; + Statistics batchStats = new Statistics(); while (reader.readRecord()) { line = reader.getRawRecord(); @@ -179,6 +181,7 @@ public void process() throws IOException { batchStatsColumnsLine = line; } else if (line.startsWith(CsvConstants.STATS)) { batchStatsLine = line; + putStats(batchStats, batchStatsColumnsLine, batchStatsLine); } else { if (writer == null) { throw new IllegalStateException("Invalid batch data was received: " + line); @@ -252,6 +255,21 @@ protected void writeLine(String line) throws IOException { writer.write("\n"); } } + + protected void putStats(Statistics stats, String columnsString, String statsString) { + String statsColumns[] = StringUtils.split(columnsString, ','); + String statsValues[] = StringUtils.split(statsString, ','); + + if (statsValues != null && statsColumns != null) { + for (int i = 0; i < statsColumns.length; i++) { + String column = statsColumns[i]; + if (i < statsValues.length) { + long stat = Long.parseLong(statsValues[i]); + stats.set(column, stat); + } + } + } + } class TableLine { String catalogLine;