diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/AbstractBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/AbstractBulkDatabaseWriter.java index 44c74e1124..de5e50b05c 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/AbstractBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/AbstractBulkDatabaseWriter.java @@ -49,7 +49,7 @@ public void start(Batch batch) { super.start(batch); batch.setBulkLoaderFlag(true); if (isFallBackToDefault()) { - batch.setBulkLoaderFlag(false); + batch.setBulkLoaderFlag(false); getTransaction().setInBatchMode(false); getTransaction().clearBatch(); log.debug("Writing batch " + batch.getBatchId() + " on channel " + batch.getChannelId() + " to node " + batch.getTargetNodeId() diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/AbstractBatch.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/AbstractBatch.java index 2535ca7e6f..e4142bf86b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/AbstractBatch.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/AbstractBatch.java @@ -703,12 +703,11 @@ public void setTableExtractedCount(Map> tableExtracted this.tableExtractedCount = tableExtractedCount; } - public boolean isBulkLoadFlag() { - return bulkLoadFlag; - } - - public void setBulkLoadFlag(boolean bulkLoadFlag) { - this.bulkLoadFlag = bulkLoadFlag; - } - + public boolean isBulkLoadFlag() { + return bulkLoadFlag; + } + + public void setBulkLoadFlag(boolean bulkLoadFlag) { + this.bulkLoadFlag = bulkLoadFlag; + } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/BatchAck.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/BatchAck.java index a410759595..d9b0542670 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/BatchAck.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/BatchAck.java @@ -259,12 +259,11 @@ public long getTransformLoadMillis() { return transformLoadMillis; } - public boolean isBulkLoaderFlag() { - return bulkLoaderFlag; - } - - public void setBulkLoaderFlag(boolean bulkLoaderFlag) { - this.bulkLoaderFlag = bulkLoaderFlag; - } - + public boolean isBulkLoaderFlag() { + return bulkLoaderFlag; + } + + public void setBulkLoaderFlag(boolean bulkLoaderFlag) { + this.bulkLoaderFlag = bulkLoaderFlag; + } } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadStatus.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadStatus.java index e16d452d7d..2a61d3dcb1 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadStatus.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadStatus.java @@ -50,7 +50,7 @@ public class TableReloadStatus { protected Date lastUpdateTime = new Date(); protected int numBatchesBulkLoaded; - @Override + @Override public int hashCode() { final int prime = 31; int result = 1; @@ -307,12 +307,12 @@ public int getTotalBatchCount() { public int getTotalBatchLoaded() { return this.setupBatchLoaded + this.dataBatchLoaded + this.finalizeBatchLoaded; } - + public int getNumBatchesBulkLoaded() { - return numBatchesBulkLoaded; - } + return numBatchesBulkLoaded; + } - public void setNumBatchesBulkLoaded(int numBatchesBulkLoaded) { - this.numBatchesBulkLoaded = numBatchesBulkLoaded; - } + public void setNumBatchesBulkLoaded(int numBatchesBulkLoaded) { + this.numBatchesBulkLoaded = numBatchesBulkLoaded; + } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 8a9eeec580..e52787820d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -450,7 +450,7 @@ batchId, batchCount, batchId, batchCount, batchId, batchCount, new Date(), new int[] { idType, Types.NUMERIC, idType, Types.NUMERIC, idType, Types.NUMERIC, idType, Types.NUMERIC, idType, Types.NUMERIC, idType, Types.NUMERIC, Types.TIMESTAMP, idType, Types.NUMERIC, idType, Types.NUMERIC, idType, Types.NUMERIC, idType, - Types.VARCHAR, Types.TIMESTAMP, idType, Types.NUMERIC, idType}); + Types.VARCHAR, Types.TIMESTAMP, idType, Types.NUMERIC, idType }); List status = transaction.query(getSql("selectTableReloadStatusByLoadId"), new TableReloadStatusMapper(), new Object[] { loadId }, new int[] { idType }); if (status != null && status.size() > 0 && count > 0) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java index 9be1dcfcc2..904e61ea8e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java @@ -141,7 +141,7 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace + " setup_batch_count <= (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end) and " + " finalize_batch_loaded <= (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end)) " + " then ? else end_time end, " - + " data_batch_loaded = case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end, " + + " data_batch_loaded = case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end, " + " setup_batch_loaded = case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end, " + " finalize_batch_loaded = case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end, " + " rows_loaded = (select case when sum(loaded_rows) is null then 0 else sum(loaded_rows) end from $(extract_request) where load_id = ? and source_node_id = ?), " diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ManageIncomingBatchListener.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ManageIncomingBatchListener.java index aefd547f58..30dbe7d9ee 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ManageIncomingBatchListener.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ManageIncomingBatchListener.java @@ -184,9 +184,9 @@ public void batchInError(DataContext context, Throwable ex) { throw ex; } if (context.get(ContextConstants.CONTEXT_BULK_WRITER_TO_USE) != null && context.get(ContextConstants.CONTEXT_BULK_WRITER_TO_USE).equals("bulk")) { - currentBatch.setBulkLoadFlag(false); + currentBatch.setBulkLoadFlag(false); log.info("Bulk loading failed for this batch " + context.getBatch().getBatchId() + ", falling back to default loading. (" + ex + ")"); - log.debug("Bulk loading error.", ex); + log.debug("Bulk loading error.", ex); } else { /* * Reread batch to make sure it wasn't set to IG or OK diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index aadb6f625d..b70f0a08db 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -215,7 +215,8 @@ public void updateOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo outgoingBatch.getFallbackInsertCount(), outgoingBatch.getFallbackUpdateCount(), outgoingBatch.getIgnoreRowCount(), outgoingBatch.getMissingDeleteCount(), outgoingBatch.getSkipCount(), outgoingBatch.getExtractRowCount(), outgoingBatch.getExtractInsertRowCount(), outgoingBatch.getExtractUpdateRowCount(), - outgoingBatch.getExtractDeleteRowCount(), outgoingBatch.getTransformExtractMillis(), outgoingBatch.getTransformLoadMillis(), outgoingBatch.isBulkLoadFlag(), + outgoingBatch.getExtractDeleteRowCount(), outgoingBatch.getTransformExtractMillis(), outgoingBatch.getTransformLoadMillis(), + outgoingBatch.isBulkLoadFlag(), outgoingBatch.getBatchId(), outgoingBatch.getNodeId() }, new int[] { Types.CHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, @@ -257,7 +258,8 @@ public void updateOutgoingBatches(ISqlTransaction transaction, List= flushSize) { transaction.flush(); diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/Batch.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/Batch.java index 7187c69f1a..1c09e80cfd 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/Batch.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/Batch.java @@ -56,16 +56,16 @@ public enum BatchType { protected Statistics statistics; protected boolean invalidRetry = false; protected boolean bulkLoaderFlag; - + public boolean isBulkLoaderFlag() { - return bulkLoaderFlag; - } + return bulkLoaderFlag; + } - public void setBulkLoaderFlag(boolean bulkLoaderFlag) { - this.bulkLoaderFlag = bulkLoaderFlag; - } + public void setBulkLoaderFlag(boolean bulkLoaderFlag) { + this.bulkLoaderFlag = bulkLoaderFlag; + } - protected Map timers = new HashMap(); + protected Map timers = new HashMap(); public Batch(BatchType batchType, long batchId, String channelId, BinaryEncoding binaryEncoding, String sourceNodeId, String targetNodeId, boolean common) { this.batchType = batchType;