diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MsSqlBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MsSqlBulkDatabaseWriter.java index 51ab14285a..749f85dd16 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MsSqlBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MsSqlBulkDatabaseWriter.java @@ -1,242 +1,242 @@ -/** - * Licensed to JumpMind Inc under one or more contributor - * license agreements. See the NOTICE file distributed - * with this work for additional information regarding - * copyright ownership. JumpMind Inc licenses this file - * to you under the GNU General Public License, version 3.0 (GPLv3) - * (the "License"); you may not use this file except in compliance - * with the License. - * - * You should have received a copy of the GNU General Public License, - * version 3.0 (GPLv3) along with this library; if not, see - * . - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.jumpmind.symmetric.io; - -import java.io.OutputStream; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Map; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.lang.StringEscapeUtils; -import org.apache.commons.lang.StringUtils; -import org.jumpmind.db.model.Column; -import org.jumpmind.db.model.Table; -import org.jumpmind.db.platform.DatabaseInfo; -import org.jumpmind.db.platform.IDatabasePlatform; -import org.jumpmind.db.sql.JdbcSqlTransaction; -import org.jumpmind.db.util.BinaryEncoding; -import org.jumpmind.symmetric.io.data.CsvData; -import org.jumpmind.symmetric.io.data.DataEventType; -import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants; -import org.jumpmind.symmetric.io.stage.IStagedResource; -import org.jumpmind.symmetric.io.stage.IStagingManager; -import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor; - -public class MsSqlBulkDatabaseWriter extends AbstractBulkDatabaseWriter { - - protected NativeJdbcExtractor jdbcExtractor; - protected int maxRowsBeforeFlush; - protected IStagingManager stagingManager; - protected IStagedResource stagedInputFile; - protected String rowTerminator = "\r\n"; - protected String fieldTerminator = "||"; - protected int loadedRows = 0; - protected boolean fireTriggers; - protected String uncPath; - protected boolean needsBinaryConversion; - protected boolean needsColumnsReordered; - protected Table table = null; - protected Table databaseTable = null; - - public MsSqlBulkDatabaseWriter(IDatabasePlatform platform, - IStagingManager stagingManager, NativeJdbcExtractor jdbcExtractor, - int maxRowsBeforeFlush, boolean fireTriggers, String uncPath, String fieldTerminator, String rowTerminator) { - super(platform); - this.jdbcExtractor = jdbcExtractor; - this.maxRowsBeforeFlush = maxRowsBeforeFlush; - this.stagingManager = stagingManager; - this.fireTriggers = fireTriggers; - if (fieldTerminator != null && fieldTerminator.length() > 0) { - this.fieldTerminator = fieldTerminator; - } - if (rowTerminator != null && rowTerminator.length() > 0) { - this.rowTerminator = rowTerminator; - } - this.uncPath = uncPath; - } - - public boolean start(Table table) { - this.table = table; - if (super.start(table)) { - needsBinaryConversion = false; - if (! batch.getBinaryEncoding().equals(BinaryEncoding.HEX)) { - for (Column column : targetTable.getColumns()) { - if (column.isOfBinaryType()) { - needsBinaryConversion = true; - break; - } - } - } - databaseTable = platform.getTableFromCache(sourceTable.getCatalog(), sourceTable.getSchema(), - sourceTable.getName(), false); - String[] csvNames = targetTable.getColumnNames(); - String[] columnNames = databaseTable.getColumnNames(); - needsColumnsReordered = false; - for (int i = 0; i < csvNames.length; i++) { - if (! csvNames[i].equals(columnNames[i])) { - needsColumnsReordered = true; - break; - } - } - //TODO: Did this because start is getting called multiple times - // for the same table in a single batch before end is being called - if (this.stagedInputFile == null) { - createStagingFile(); - } - return true; - } else { - return false; - } - } - - @Override - public void end(Table table) { - try { - flush(); - this.stagedInputFile.close(); - this.stagedInputFile.delete(); - } finally { - super.end(table); - } - } - - protected void bulkWrite(CsvData data) { - - DataEventType dataEventType = data.getDataEventType(); - - switch (dataEventType) { - case INSERT: - statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS); - try { - String[] parsedData = data.getParsedData(CsvData.ROW_DATA); - if (needsBinaryConversion) { - Column[] columns = targetTable.getColumns(); - for (int i = 0; i < columns.length; i++) { - if (columns[i].isOfBinaryType()) { - if (batch.getBinaryEncoding().equals(BinaryEncoding.BASE64) && parsedData[i] != null) { - parsedData[i] = new String(Hex.encodeHex(Base64.decodeBase64(parsedData[i].getBytes()))); - } - } - } - } - OutputStream out = this.stagedInputFile.getOutputStream(); - if (needsColumnsReordered) { - Map mapData = data.toColumnNameValuePairs(targetTable.getColumnNames(), CsvData.ROW_DATA); - String[] columnNames = databaseTable.getColumnNames(); - for (int i = 0; i < columnNames.length; i++) { - String columnData = mapData.get(columnNames[i]); - if (columnData != null) { - out.write(columnData.getBytes()); - } - if (i + 1 < columnNames.length) { - out.write(fieldTerminator.getBytes()); - } - } - } else { - for (int i = 0; i < parsedData.length; i++) { - if (parsedData[i] != null) { - out.write(parsedData[i].getBytes()); - } - if (i + 1 < parsedData.length) { - out.write(fieldTerminator.getBytes()); - } - } - } - out.write(rowTerminator.getBytes()); - loadedRows++; - } catch (Exception ex) { - throw getPlatform().getSqlTemplate().translate(ex); - } finally { - statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS); - statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT); - statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); - } - break; - case UPDATE: - case DELETE: - default: - flush(); - writeDefault(data); - break; - } - - if (loadedRows >= maxRowsBeforeFlush) { - flush(); - } - } - - protected void flush() { - if (loadedRows > 0) { - this.stagedInputFile.close(); - statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS); - String filename; - if (StringUtils.isEmpty(uncPath)) { - filename = stagedInputFile.getFile().getAbsolutePath(); - } else { - filename = uncPath + "\\" + stagedInputFile.getFile().getName(); - } - try { - DatabaseInfo dbInfo = platform.getDatabaseInfo(); - String quote = dbInfo.getDelimiterToken(); - String catalogSeparator = dbInfo.getCatalogSeparator(); - String schemaSeparator = dbInfo.getSchemaSeparator(); - JdbcSqlTransaction jdbcTransaction = (JdbcSqlTransaction) transaction; - Connection c = jdbcTransaction.getConnection(); - String rowTerminatorString = ""; - /* - * There seems to be a bug with the SQL server bulk insert when - * you have one row with binary data at the end using \n as the - * row terminator. It works when you leave the row terminator - * out of the bulk insert statement. - */ - if (!(rowTerminator.equals("\n") || rowTerminator.equals("\r\n"))) { - rowTerminatorString = ", ROWTERMINATOR='" + StringEscapeUtils.escapeJava(rowTerminator) + "'"; - } - String sql = String.format("BULK INSERT " + - this.getTargetTable().getQualifiedTableName(quote, catalogSeparator, schemaSeparator) + - " FROM '" + filename) + "'" + - " WITH (DATAFILETYPE='widechar', FIELDTERMINATOR='"+StringEscapeUtils.escapeJava(fieldTerminator)+"', KEEPIDENTITY" + - (fireTriggers ? ", FIRE_TRIGGERS" : "") + rowTerminatorString +");"; - Statement stmt = c.createStatement(); - - //TODO: clean this up, deal with errors, etc.? - stmt.execute(sql); - stmt.close(); - - } catch (SQLException ex) { - throw platform.getSqlTemplate().translate(ex); - } finally { - statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS); - } - } - } - - protected void createStagingFile() { - //TODO: We should use constants for dir structure path, - // but we don't want to depend on symmetric core. - this.stagedInputFile = stagingManager.create("bulkloaddir", - table.getName() + this.getBatch().getBatchId() + ".csv"); - } +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.io; + +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.lang.StringUtils; +import org.jumpmind.db.model.Column; +import org.jumpmind.db.model.Table; +import org.jumpmind.db.platform.DatabaseInfo; +import org.jumpmind.db.platform.IDatabasePlatform; +import org.jumpmind.db.sql.JdbcSqlTransaction; +import org.jumpmind.db.util.BinaryEncoding; +import org.jumpmind.symmetric.io.data.CsvData; +import org.jumpmind.symmetric.io.data.DataEventType; +import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants; +import org.jumpmind.symmetric.io.stage.IStagedResource; +import org.jumpmind.symmetric.io.stage.IStagingManager; +import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor; + +public class MsSqlBulkDatabaseWriter extends AbstractBulkDatabaseWriter { + + protected NativeJdbcExtractor jdbcExtractor; + protected int maxRowsBeforeFlush; + protected IStagingManager stagingManager; + protected IStagedResource stagedInputFile; + protected String rowTerminator = "\r\n"; + protected String fieldTerminator = "||"; + protected int loadedRows = 0; + protected boolean fireTriggers; + protected String uncPath; + protected boolean needsBinaryConversion; + protected boolean needsColumnsReordered; + protected Table table = null; + protected Table databaseTable = null; + + public MsSqlBulkDatabaseWriter(IDatabasePlatform platform, + IStagingManager stagingManager, NativeJdbcExtractor jdbcExtractor, + int maxRowsBeforeFlush, boolean fireTriggers, String uncPath, String fieldTerminator, String rowTerminator) { + super(platform); + this.jdbcExtractor = jdbcExtractor; + this.maxRowsBeforeFlush = maxRowsBeforeFlush; + this.stagingManager = stagingManager; + this.fireTriggers = fireTriggers; + if (fieldTerminator != null && fieldTerminator.length() > 0) { + this.fieldTerminator = fieldTerminator; + } + if (rowTerminator != null && rowTerminator.length() > 0) { + this.rowTerminator = rowTerminator; + } + this.uncPath = uncPath; + } + + public boolean start(Table table) { + this.table = table; + if (super.start(table)) { + needsBinaryConversion = false; + if (! batch.getBinaryEncoding().equals(BinaryEncoding.HEX)) { + for (Column column : targetTable.getColumns()) { + if (column.isOfBinaryType()) { + needsBinaryConversion = true; + break; + } + } + } + databaseTable = platform.getTableFromCache(sourceTable.getCatalog(), sourceTable.getSchema(), + sourceTable.getName(), false); + String[] csvNames = targetTable.getColumnNames(); + String[] columnNames = databaseTable.getColumnNames(); + needsColumnsReordered = false; + for (int i = 0; i < csvNames.length; i++) { + if (! csvNames[i].equals(columnNames[i])) { + needsColumnsReordered = true; + break; + } + } + //TODO: Did this because start is getting called multiple times + // for the same table in a single batch before end is being called + if (this.stagedInputFile == null) { + createStagingFile(); + } + return true; + } else { + return false; + } + } + + @Override + public void end(Table table) { + try { + flush(); + this.stagedInputFile.close(); + this.stagedInputFile.delete(); + } finally { + super.end(table); + } + } + + protected void bulkWrite(CsvData data) { + + DataEventType dataEventType = data.getDataEventType(); + + switch (dataEventType) { + case INSERT: + statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS); + try { + String[] parsedData = data.getParsedData(CsvData.ROW_DATA); + if (needsBinaryConversion) { + Column[] columns = targetTable.getColumns(); + for (int i = 0; i < columns.length; i++) { + if (columns[i].isOfBinaryType()) { + if (batch.getBinaryEncoding().equals(BinaryEncoding.BASE64) && parsedData[i] != null) { + parsedData[i] = new String(Hex.encodeHex(Base64.decodeBase64(parsedData[i].getBytes()))); + } + } + } + } + OutputStream out = this.stagedInputFile.getOutputStream(); + if (needsColumnsReordered) { + Map mapData = data.toColumnNameValuePairs(targetTable.getColumnNames(), CsvData.ROW_DATA); + String[] columnNames = databaseTable.getColumnNames(); + for (int i = 0; i < columnNames.length; i++) { + String columnData = mapData.get(columnNames[i]); + if (columnData != null) { + out.write(columnData.getBytes()); + } + if (i + 1 < columnNames.length) { + out.write(fieldTerminator.getBytes()); + } + } + } else { + for (int i = 0; i < parsedData.length; i++) { + if (parsedData[i] != null) { + out.write(parsedData[i].getBytes()); + } + if (i + 1 < parsedData.length) { + out.write(fieldTerminator.getBytes()); + } + } + } + out.write(rowTerminator.getBytes()); + loadedRows++; + } catch (Exception ex) { + throw getPlatform().getSqlTemplate().translate(ex); + } finally { + statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS); + statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT); + statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); + } + break; + case UPDATE: + case DELETE: + default: + flush(); + writeDefault(data); + break; + } + + if (loadedRows >= maxRowsBeforeFlush) { + flush(); + } + } + + protected void flush() { + if (loadedRows > 0) { + this.stagedInputFile.close(); + statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS); + String filename; + if (StringUtils.isEmpty(uncPath)) { + filename = stagedInputFile.getFile().getAbsolutePath(); + } else { + filename = uncPath + "\\" + stagedInputFile.getFile().getName(); + } + try { + DatabaseInfo dbInfo = platform.getDatabaseInfo(); + String quote = dbInfo.getDelimiterToken(); + String catalogSeparator = dbInfo.getCatalogSeparator(); + String schemaSeparator = dbInfo.getSchemaSeparator(); + JdbcSqlTransaction jdbcTransaction = (JdbcSqlTransaction) transaction; + Connection c = jdbcTransaction.getConnection(); + String rowTerminatorString = ""; + /* + * There seems to be a bug with the SQL server bulk insert when + * you have one row with binary data at the end using \n as the + * row terminator. It works when you leave the row terminator + * out of the bulk insert statement. + */ + if (!(rowTerminator.equals("\n") || rowTerminator.equals("\r\n"))) { + rowTerminatorString = ", ROWTERMINATOR='" + StringEscapeUtils.escapeJava(rowTerminator) + "'"; + } + String sql = String.format("BULK INSERT " + + this.getTargetTable().getQualifiedTableName(quote, catalogSeparator, schemaSeparator) + + " FROM '" + filename) + "'" + + " WITH (DATAFILETYPE='widechar', FIELDTERMINATOR='"+StringEscapeUtils.escapeJava(fieldTerminator)+"', KEEPIDENTITY" + + (fireTriggers ? ", FIRE_TRIGGERS" : "") + rowTerminatorString +");"; + Statement stmt = c.createStatement(); + + //TODO: clean this up, deal with errors, etc.? + stmt.execute(sql); + stmt.close(); + + } catch (SQLException ex) { + throw platform.getSqlTemplate().translate(ex); + } finally { + statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS); + } + } + } + + protected void createStagingFile() { + //TODO: We should use constants for dir structure path, + // but we don't want to depend on symmetric core. + this.stagedInputFile = stagingManager.create("bulkloaddir", + table.getName() + this.getBatch().getBatchId() + ".csv"); + } } \ No newline at end of file diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MySqlBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MySqlBulkDatabaseWriter.java index 2a031fcfb0..ecd921220b 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MySqlBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/MySqlBulkDatabaseWriter.java @@ -158,7 +158,7 @@ protected void bulkWrite(CsvData data) { throw getPlatform().getSqlTemplate().translate(ex); } finally { statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS); - statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT); + statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT); statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS); } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriter.java index 9a1cc16792..13d7bc9fb0 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/OracleBulkDatabaseWriter.java @@ -97,7 +97,7 @@ protected void bulkWrite(CsvData data) { boolean requiresFlush = false; switch (dataEventType) { case INSERT: - statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT); + statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT); statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); if (filterBefore(data)) { Object[] rowData = platform.getObjectValues(batch.getBinaryEncoding(), getRowData(data, CsvData.ROW_DATA), @@ -300,7 +300,7 @@ protected void flush() { if (errors.length > 0) { // set the statement count so the failed row number get reported correctly - statistics.get(batch).set(DataWriterStatisticConstants.STATEMENTCOUNT, + statistics.get(batch).set(DataWriterStatisticConstants.ROWCOUNT, errors[0]); throw new BulkSqlException(errors, lastEventType.toString(), sql); diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java index db9d8ad66f..9acfceed3d 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java @@ -124,7 +124,7 @@ protected void bulkWrite(CsvData data) { } } statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS); - statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT); + statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT); statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); } @@ -176,7 +176,7 @@ protected void endCopy() { copyIn.endCopy(); } } catch (Exception ex) { - statistics.get(batch).set(DataWriterStatisticConstants.STATEMENTCOUNT, 0); + statistics.get(batch).set(DataWriterStatisticConstants.ROWCOUNT, 0); statistics.get(batch).set(DataWriterStatisticConstants.LINENUMBER, 0); throw getPlatform().getSqlTemplate().translate(ex); diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/RedshiftBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/RedshiftBulkDatabaseWriter.java index 7fc704a89c..cf7fe96acd 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/RedshiftBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/RedshiftBulkDatabaseWriter.java @@ -123,7 +123,7 @@ public void bulkWrite(CsvData data) { switch (dataEventType) { case INSERT: - statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT); + statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT); statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS); try { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/AbstractBatch.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/AbstractBatch.java index 4e29ce01fb..7b2559f17f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/AbstractBatch.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/AbstractBatch.java @@ -479,7 +479,7 @@ public long getDataDeleteRowCount() { return dataDeleteRowCount; } - public void incrementEventCount(DataEventType type) { + public void incrementRowCount(DataEventType type) { switch (type) { case RELOAD: reloadRowCount++; @@ -498,6 +498,26 @@ public void incrementEventCount(DataEventType type) { break; } } + + public void incrementExtractRowCount(DataEventType type) { + switch (type) { + case INSERT: + extractInsertRowCount++; + break; + case UPDATE: + extractUpdateRowCount++; + break; + case DELETE: + extractDeleteRowCount++; + break; + default: + break; + } + } + + public void incrementExtractRowCount() { + this.extractRowCount++; + } public void setDataInsertRowCount(long dataInsertRowCount) { this.dataInsertRowCount = dataInsertRowCount; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java index 218b88616e..2c5401afa6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/IncomingBatch.java @@ -58,7 +58,7 @@ public void setValues(Statistics readerStatistics, Statistics writerStatistics, if (writerStatistics != null) { setFilterMillis(writerStatistics.get(DataWriterStatisticConstants.FILTERMILLIS)); setLoadMillis(writerStatistics.get(DataWriterStatisticConstants.LOADMILLIS)); - setLoadRowCount(writerStatistics.get(DataWriterStatisticConstants.STATEMENTCOUNT)); + setLoadRowCount(writerStatistics.get(DataWriterStatisticConstants.ROWCOUNT)); setFallbackInsertCount(writerStatistics.get(DataWriterStatisticConstants.FALLBACKINSERTCOUNT)); setFallbackUpdateCount(writerStatistics.get(DataWriterStatisticConstants.FALLBACKUPDATECOUNT)); setMissingDeleteCount(writerStatistics.get(DataWriterStatisticConstants.MISSINGDELETECOUNT)); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 07ac561b6a..6c2eb649eb 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -899,12 +899,16 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targe Statistics stats = getExtractStats(writer); if (stats != null) { transformTimeInMs = stats.get(DataWriterStatisticConstants.TRANSFORMMILLIS); - currentBatch.setExtractCount(stats.get(DataWriterStatisticConstants.STATEMENTCOUNT)); + currentBatch.setExtractCount(stats.get(DataWriterStatisticConstants.ROWCOUNT)); + currentBatch.setDataRowCount(stats.get(DataWriterStatisticConstants.ROWCOUNT)); + currentBatch.setDataInsertRowCount(stats.get(DataWriterStatisticConstants.INSERTCOUNT)); + currentBatch.setDataUpdateRowCount(stats.get(DataWriterStatisticConstants.UPDATECOUNT)); + currentBatch.setDataDeleteRowCount(stats.get(DataWriterStatisticConstants.DELETECOUNT)); extractTimeInMs = extractTimeInMs - transformTimeInMs; byteCount = stats.get(DataWriterStatisticConstants.BYTECOUNT); statisticManager.incrementDataBytesExtracted(currentBatch.getChannelId(), byteCount); statisticManager.incrementDataExtracted(currentBatch.getChannelId(), - stats.get(DataWriterStatisticConstants.STATEMENTCOUNT)); + stats.get(DataWriterStatisticConstants.ROWCOUNT)); } } @@ -1173,7 +1177,6 @@ protected void transferFromStaging(ExtractMode mode, BatchType batchType, Outgoi } char[] buffer = new char[bufferSize]; - //TODO: Write Batch Stats boolean batchStatsWritten = false; String prevBuffer = ""; while ((numCharsRead = reader.read(buffer)) != -1) { @@ -1253,7 +1256,7 @@ protected int findStatsIndex(String bufferString, String prevBuffer) { int index = -1; String fullBuffer = prevBuffer + bufferString; - String pattern = "\n" + CsvConstants.BATCH + ",\\d*\r*\n"; + String pattern = "\n" + CsvConstants.BATCH + "\\s*,\\s*\\d*\r*\n"; Pattern r = Pattern.compile(pattern); Matcher m = r.matcher(fullBuffer); if (m.find()) { @@ -1904,7 +1907,8 @@ public CsvData next() { || symmetricDialect.getName().equals( DatabaseNamesConstants.MSSQL2008)); - outgoingBatch.incrementDataRowCount(); + outgoingBatch.incrementExtractRowCount(); + outgoingBatch.incrementExtractRowCount(data.getDataEventType()); } else { log.error( "Could not locate a trigger with the id of {} for {}. It was recorded in the hist table with a hist id of {}", @@ -2068,8 +2072,8 @@ public CsvData next() { .isNotBlank(triggerRouter.getInitialLoadSelect()), triggerRouter)); if (data != null && outgoingBatch != null && !outgoingBatch.isExtractJobFlag()) { - outgoingBatch.incrementDataRowCount(); - outgoingBatch.incrementEventCount(data.getDataEventType()); + outgoingBatch.incrementExtractRowCount(); + outgoingBatch.incrementExtractRowCount(data.getDataEventType()); } return data; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 0048fda281..8d1c1079ff 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -1374,7 +1374,7 @@ protected long insertDataEventAndOutgoingBatch(ISqlTransaction transaction, long outgoingBatch.setLoadId(loadId); outgoingBatch.setCreateBy(createBy); outgoingBatch.setLoadFlag(isLoad); - outgoingBatch.incrementEventCount(eventType); + outgoingBatch.incrementRowCount(eventType); outgoingBatch.incrementDataRowCount(); if (tableName != null) { outgoingBatch.incrementTableCount(tableName.toLowerCase()); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index 02992e74ab..7e64263888 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -207,26 +207,30 @@ public void updateOutgoingBatch(OutgoingBatch outgoingBatch) { public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgoingBatch) { outgoingBatch.setLastUpdatedTime(new Date()); outgoingBatch.setLastUpdatedHostName(clusterService.getServerId()); - transaction.prepareAndExecute(getSql("updateOutgoingBatchSql"), new Object[] { outgoingBatch.getStatus().name(), - outgoingBatch.getLoadId(), outgoingBatch.isExtractJobFlag() ? 1 : 0, outgoingBatch.isLoadFlag() ? 1 : 0, - outgoingBatch.isErrorFlag() ? 1 : 0, outgoingBatch.getByteCount(), outgoingBatch.getExtractCount(), - outgoingBatch.getSentCount(), outgoingBatch.getLoadCount(), outgoingBatch.getDataRowCount(), - outgoingBatch.getReloadRowCount(), outgoingBatch.getDataInsertRowCount(), outgoingBatch.getDataUpdateRowCount(), - outgoingBatch.getDataDeleteRowCount(), outgoingBatch.getOtherRowCount(), outgoingBatch.getIgnoreCount(), - outgoingBatch.getRouterMillis(), outgoingBatch.getNetworkMillis(), outgoingBatch.getFilterMillis(), - outgoingBatch.getLoadMillis(), outgoingBatch.getExtractMillis(), outgoingBatch.getExtractStartTime(), - outgoingBatch.getTransferStartTime(), outgoingBatch.getLoadStartTime(), outgoingBatch.getSqlState(), - outgoingBatch.getSqlCode(), FormatUtils.abbreviateForLogging(outgoingBatch.getSqlMessage()), outgoingBatch.getFailedDataId(), - outgoingBatch.getLastUpdatedHostName(), outgoingBatch.getSummary(), outgoingBatch.getLoadRowCount(), - outgoingBatch.getLoadInsertRowCount(), outgoingBatch.getLoadUpdateRowCount(), outgoingBatch.getLoadDeleteRowCount(), - outgoingBatch.getFallbackInsertCount(), outgoingBatch.getFallbackUpdateCount(), outgoingBatch.getIgnoreRowCount(), - outgoingBatch.getMissingDeleteCount(), outgoingBatch.getSkipCount(), outgoingBatch.getBatchId(), outgoingBatch.getNodeId() }, - new int[] { Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, + transaction.prepareAndExecute(getSql("updateOutgoingBatchSql"), + new Object[] { outgoingBatch.getStatus().name(), outgoingBatch.getLoadId(), outgoingBatch.isExtractJobFlag() ? 1 : 0, + outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isErrorFlag() ? 1 : 0, outgoingBatch.getByteCount(), + outgoingBatch.getExtractCount(), outgoingBatch.getSentCount(), outgoingBatch.getLoadCount(), + outgoingBatch.getDataRowCount(), outgoingBatch.getReloadRowCount(), outgoingBatch.getDataInsertRowCount(), + outgoingBatch.getDataUpdateRowCount(), outgoingBatch.getDataDeleteRowCount(), outgoingBatch.getOtherRowCount(), + outgoingBatch.getIgnoreCount(), outgoingBatch.getRouterMillis(), outgoingBatch.getNetworkMillis(), + outgoingBatch.getFilterMillis(), outgoingBatch.getLoadMillis(), outgoingBatch.getExtractMillis(), + outgoingBatch.getExtractStartTime(), outgoingBatch.getTransferStartTime(), outgoingBatch.getLoadStartTime(), + outgoingBatch.getSqlState(), outgoingBatch.getSqlCode(), + FormatUtils.abbreviateForLogging(outgoingBatch.getSqlMessage()), outgoingBatch.getFailedDataId(), + outgoingBatch.getLastUpdatedHostName(), outgoingBatch.getSummary(), outgoingBatch.getLoadRowCount(), + outgoingBatch.getLoadInsertRowCount(), outgoingBatch.getLoadUpdateRowCount(), outgoingBatch.getLoadDeleteRowCount(), + outgoingBatch.getFallbackInsertCount(), outgoingBatch.getFallbackUpdateCount(), outgoingBatch.getIgnoreRowCount(), + outgoingBatch.getMissingDeleteCount(), outgoingBatch.getSkipCount(), outgoingBatch.getExtractRowCount(), + outgoingBatch.getExtractInsertRowCount(), outgoingBatch.getExtractUpdateRowCount(), + outgoingBatch.getExtractDeleteRowCount(), outgoingBatch.getBatchId(), outgoingBatch.getNodeId() }, + new int[] { Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, - Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, - Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, - Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, - Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, + Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, + Types.TIMESTAMP, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.NUMERIC, + Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, + Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, + Types.NUMERIC, symmetricDialect.getSqlTypeForIds(), Types.VARCHAR }); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 1466ebb843..faad538d29 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -64,7 +64,8 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, + " failed_data_id=?, last_update_hostname=?, last_update_time=current_timestamp, summary=?, " + " load_row_count=?, load_insert_row_count=?, load_update_row_count=?, load_delete_row_count=?, " + " fallback_insert_count=?, fallback_update_count=?, ignore_row_count=?, missing_delete_count=?, " - + " skip_count=? where batch_id=? and node_id=? "); + + " skip_count=?, extract_row_count=?, extract_insert_row_count=?, extract_update_row_count=?, extract_delete_row_count=? " + + " where batch_id=? and node_id=? "); putSql("findOutgoingBatchSql", "where batch_id=? and node_id=? "); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index 996462772a..e779427a62 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -1039,7 +1039,7 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con context.setLastLoadId(-1); } - batch.incrementEventCount(dataMetaData.getData().getDataEventType()); + batch.incrementRowCount(dataMetaData.getData().getDataEventType()); batch.incrementDataRowCount(); batch.incrementTableCount(dataMetaData.getTable().getNameLowerCase()); diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java index 5228ff12ec..04efe65ece 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java @@ -151,7 +151,7 @@ public void write(CsvData data) { || (targetTable == null && data.getDataEventType() == DataEventType.SQL)) { try { - statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT); + statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT); statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); if (filterBefore(data)) { diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriterConflictResolver.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriterConflictResolver.java index 571992dc87..ba4bb45bfb 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriterConflictResolver.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriterConflictResolver.java @@ -38,7 +38,7 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu DatabaseWriterSettings writerSettings = writer.getWriterSettings(); Conflict conflict = writerSettings.pickConflict(writer.getTargetTable(), writer.getBatch()); Statistics statistics = writer.getStatistics().get(writer.getBatch()); - long statementCount = statistics.get(DataWriterStatisticConstants.STATEMENTCOUNT); + long statementCount = statistics.get(DataWriterStatisticConstants.ROWCOUNT); long lineNumber = statistics.get(DataWriterStatisticConstants.LINENUMBER); ResolvedData resolvedData = writerSettings.getResolvedData(statementCount); diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractProtocolDataWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractProtocolDataWriter.java index e1f9b87e9f..334166fb2f 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractProtocolDataWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractProtocolDataWriter.java @@ -154,11 +154,12 @@ public void write(CsvData data) { println(CsvConstants.NO_BINARY_OLD_DATA, Boolean.toString(noBinaryOldData)); } - statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT); + statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT); statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); switch (data.getDataEventType()) { case INSERT: println(CsvConstants.INSERT, data.getCsvData(CsvData.ROW_DATA)); + statistics.get(batch).increment(DataWriterStatisticConstants.INSERTCOUNT); break; case UPDATE: @@ -170,6 +171,7 @@ public void write(CsvData data) { } println(CsvConstants.UPDATE, data.getCsvData(CsvData.ROW_DATA), data.getCsvData(CsvData.PK_DATA)); + statistics.get(batch).increment(DataWriterStatisticConstants.UPDATECOUNT); break; case DELETE: @@ -180,6 +182,7 @@ public void write(CsvData data) { } } println(CsvConstants.DELETE, data.getCsvData(CsvData.PK_DATA)); + statistics.get(batch).increment(DataWriterStatisticConstants.DELETECOUNT); break; case CREATE: diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DataWriterStatisticConstants.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DataWriterStatisticConstants.java index 841ab668c3..2afe26e555 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DataWriterStatisticConstants.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DataWriterStatisticConstants.java @@ -27,7 +27,7 @@ abstract public class DataWriterStatisticConstants { public static final String TRANSFORMMILLIS = "TRANSFORMMILLIS"; public static final String FILTERMILLIS = "FILTERMILLIS"; public static final String LOADMILLIS = "LOADMILLIS"; - public static final String STATEMENTCOUNT = "STATEMENTCOUNT"; + public static final String ROWCOUNT = "STATEMENTCOUNT"; public static final String INSERTCOUNT = "INSERTCOUNT"; public static final String DELETECOUNT = "DELETECOUNT"; public static final String UPDATECOUNT = "UPDATECOUNT"; diff --git a/symmetric-io/src/test/java/org/jumpmind/symmetric/io/AbstractWriterTest.java b/symmetric-io/src/test/java/org/jumpmind/symmetric/io/AbstractWriterTest.java index 966868b256..2a2d00102c 100644 --- a/symmetric-io/src/test/java/org/jumpmind/symmetric/io/AbstractWriterTest.java +++ b/symmetric-io/src/test/java/org/jumpmind/symmetric/io/AbstractWriterTest.java @@ -163,7 +163,7 @@ protected long writeData(IDataWriter writer, TableCsvData... datas) { long statementCount = 0; Collection stats = writer.getStatistics().values(); for (Statistics statistics : stats) { - statementCount += statistics.get(DataWriterStatisticConstants.STATEMENTCOUNT); + statementCount += statistics.get(DataWriterStatisticConstants.ROWCOUNT); } return statementCount; } @@ -206,7 +206,7 @@ protected long writeData(IDataWriter writer, DataContext context, TableCsvData.. long statementCount = 0; Collection stats = writer.getStatistics().values(); for (Statistics statistics : stats) { - statementCount += statistics.get(DataWriterStatisticConstants.STATEMENTCOUNT); + statementCount += statistics.get(DataWriterStatisticConstants.ROWCOUNT); } return statementCount; } diff --git a/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriterTest.java b/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriterTest.java index 50ee95575e..ff0896e13e 100644 --- a/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriterTest.java +++ b/symmetric-io/src/test/java/org/jumpmind/symmetric/io/data/writer/DatabaseWriterTest.java @@ -262,7 +262,7 @@ public void testUpdateDetectOldDataManual() { Assert.fail("Should have received a conflict exception"); } catch (ConflictException ex) { Statistics stats = lastDataWriterUsed.getStatistics().values().iterator().next(); - long statementNumber = stats.get(DataWriterStatisticConstants.STATEMENTCOUNT); + long statementNumber = stats.get(DataWriterStatisticConstants.ROWCOUNT); ResolvedData resolvedData = new ResolvedData(statementNumber, update.getCsvData(CsvData.ROW_DATA), false); writerSettings.setResolvedData(resolvedData); @@ -309,7 +309,7 @@ public void testUpdateDetectOldDataWithNullManual() { Assert.fail("Should have received a conflict exception"); } catch (ConflictException ex) { Statistics stats = lastDataWriterUsed.getStatistics().values().iterator().next(); - long statementNumber = stats.get(DataWriterStatisticConstants.STATEMENTCOUNT); + long statementNumber = stats.get(DataWriterStatisticConstants.ROWCOUNT); ResolvedData resolvedData = new ResolvedData(statementNumber, update.getCsvData(CsvData.ROW_DATA), false); writerSettings.setResolvedData(resolvedData);