diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/DataProcessor.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/DataProcessor.java index 576f979c53..6c5efd574c 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/DataProcessor.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/DataProcessor.java @@ -97,7 +97,7 @@ public void process(DataContext context) { boolean processBatch = listener == null ? true : listener .beforeBatchStarted(context); - + if (processBatch) { dataWriter = chooseDataWriter(currentBatch); processBatch &= dataWriter != null; @@ -118,7 +118,7 @@ public void process(DataContext context) { // pull and process any data events that are not wrapped // in a table - forEachDataInTable(context, processBatch, currentBatch); + forEachDataInTable(context, processBatch, false, currentBatch); // pull and process all data events wrapped in tables forEachTableInBatch(context, processBatch, currentBatch); @@ -175,7 +175,7 @@ protected int forEachTableInBatch(DataContext context, boolean processBatch, Bat if (processBatch) { processTable = context.getWriter().start(currentTable); } - dataRow += forEachDataInTable(context, processTable, batch); + dataRow += forEachDataInTable(context, processBatch, processTable, batch); } catch (IgnoreBatchException ex) { processBatch = false; } @@ -189,7 +189,7 @@ protected int forEachTableInBatch(DataContext context, boolean processBatch, Bat return dataRow; } - protected int forEachDataInTable(DataContext context, boolean processTable, Batch batch) { + protected int forEachDataInTable(DataContext context, boolean processBatch, boolean processTable, Batch batch) { int dataRow = 0; IgnoreBatchException ignore = null; long startTime = System.currentTimeMillis(); @@ -206,9 +206,11 @@ protected int forEachDataInTable(DataContext context, boolean processTable, Batc batch.startTimer(STAT_WRITE_DATA); batch.incrementLineCount(); if (context.getWriter() == null) { - context.setWriter(chooseDataWriter(batch)); + context.setWriter(chooseDataWriter(batch)); + } + if (processBatch) { + context.getWriter().write(currentData); } - context.getWriter().write(currentData); batch.incrementDataWriteMillis(batch.endTimer(STAT_WRITE_DATA)); } catch (IgnoreBatchException ex) { ignore = ex;