From 70123011e2126838e55115d347d7c0e2a267baba Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Thu, 9 Aug 2018 12:14:28 -0400 Subject: [PATCH] 0003667: Bulk loads should not log errors while falling back to default loader --- .../io/AbstractBulkDatabaseWriter.java | 16 +- .../io/PostgresBulkDatabaseWriter.java | 4 +- .../service/impl/DataLoaderService.java | 54 ++-- .../impl/ManageIncomingBatchListener.java | 234 +++++++++--------- .../symmetric/io/data/IDataWriter.java | 4 + 5 files changed, 164 insertions(+), 148 deletions(-) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/AbstractBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/AbstractBulkDatabaseWriter.java index 407b6b6ace..a72d9f421a 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/AbstractBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/AbstractBulkDatabaseWriter.java @@ -1,16 +1,13 @@ package org.jumpmind.symmetric.io; import org.jumpmind.db.platform.IDatabasePlatform; -import org.jumpmind.symmetric.io.data.Batch; import org.jumpmind.symmetric.io.data.CsvData; +import org.jumpmind.symmetric.io.data.IDataWriter; import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings; import org.jumpmind.symmetric.io.data.writer.DynamicDefaultDatabaseWriter; -import org.jumpmind.symmetric.model.IncomingBatch; public abstract class AbstractBulkDatabaseWriter extends DynamicDefaultDatabaseWriter{ - protected boolean useDefaultDataWriter; - public AbstractBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform, String tablePrefix){ super(symmetricPlatform, targetPlatform, tablePrefix); } @@ -21,9 +18,10 @@ public AbstractBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabase } public final void write(CsvData data) { - if (useDefaultDataWriter) { + if (context.get(IDataWriter.CONTEXT_BULK_WRITER_TO_USE) != null && context.get(IDataWriter.CONTEXT_BULK_WRITER_TO_USE).equals("default")) { writeDefault(data); }else{ + context.put(IDataWriter.CONTEXT_BULK_WRITER_TO_USE, "bulk"); bulkWrite(data); } } @@ -33,11 +31,5 @@ protected final void writeDefault(CsvData data) { } protected abstract void bulkWrite(CsvData data); - - @Override - public void start(Batch batch) { - super.start(batch); - IncomingBatch currentBatch = (IncomingBatch) context.get("currentBatch"); - useDefaultDataWriter = currentBatch == null ? false : currentBatch.isErrorFlag(); - } + } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java index 2a203f6101..394da549e3 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java @@ -38,6 +38,7 @@ import org.jumpmind.symmetric.io.data.CsvUtils; import org.jumpmind.symmetric.io.data.DataContext; import org.jumpmind.symmetric.io.data.DataEventType; +import org.jumpmind.symmetric.io.data.IDataWriter; import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants; import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings; import org.postgresql.copy.CopyIn; @@ -84,7 +85,6 @@ protected void bulkWrite(CsvData data) { } } } - useDefaultDataWriter=false; switch (dataEventType) { case INSERT: startCopy(); @@ -116,7 +116,7 @@ protected void bulkWrite(CsvData data) { case DELETE: default: endCopy(); - useDefaultDataWriter=true; + context.put(IDataWriter.CONTEXT_BULK_FALLBACK_TO_DEFAULT, "default"); super.write(data); break; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index 3fbf92b903..3331c16a9b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -985,12 +985,34 @@ public void start(DataContext ctx, Batch batch) { batchStartsToArriveTimeInMs = System.currentTimeMillis(); } + protected ProtocolDataReader buildDataReader(final Batch batchInStaging, final IStagedResource resource) { + return new ProtocolDataReader(BatchType.LOAD, batchInStaging.getTargetNodeId(), resource) { + @Override + public Table nextTable() { + Table table = super.nextTable(); + if (table != null && listener.currentBatch != null) { + listener.currentBatch.incrementTableCount(table.getNameLowerCase()); + } + return table; + } + + public Batch nextBatch() { + Batch nextBatch = super.nextBatch(); + if (nextBatch != null) { + nextBatch.setStatistics(batchInStaging.getStatistics()); + } + return nextBatch; + } + }; + } + public void end(final DataContext ctx, final Batch batchInStaging, final IStagedResource resource) { final long networkMillis = System.currentTimeMillis() - batchStartsToArriveTimeInMs; Callable loadBatchFromStage = new Callable() { public IncomingBatch call() throws Exception { IncomingBatch incomingBatch = null; + DataProcessor processor = null; if (!isError && resource != null && resource.exists()) { try { loadInfo = statisticManager.newProcessInfo(new ProcessInfoKey(transferInfo.getSourceNodeId(), @@ -1001,25 +1023,9 @@ public IncomingBatch call() throws Exception { loadInfo.setStatus(ProcessInfo.ProcessStatus.LOADING); - ProtocolDataReader reader = new ProtocolDataReader(BatchType.LOAD, batchInStaging.getTargetNodeId(), resource) { - @Override - public Table nextTable() { - Table table = super.nextTable(); - if (table != null && listener.currentBatch != null) { - listener.currentBatch.incrementTableCount(table.getNameLowerCase()); - } - return table; - } - - public Batch nextBatch() { - Batch nextBatch = super.nextBatch(); - if (nextBatch != null) { - nextBatch.setStatistics(batchInStaging.getStatistics()); - } - return nextBatch; - } - }; - DataProcessor processor = new DataProcessor(reader, null, listener, "data load from stage") { + ProtocolDataReader reader = buildDataReader(batchInStaging, resource); + + processor = new DataProcessor(reader, null, listener, "data load from stage") { @Override protected IDataWriter chooseDataWriter(Batch batch) { boolean isRetry = ((ManageIncomingBatchListener) listener).getCurrentBatch().isRetry(); @@ -1032,8 +1038,14 @@ protected IDataWriter chooseDataWriter(Batch batch) { loadInfo.setStatus(ProcessStatus.OK); } } catch (Exception e) { - isError = true; - throw e; + if (ctx.get(IDataWriter.CONTEXT_BULK_FALLBACK_TO_DEFAULT) != null && ctx.get(IDataWriter.CONTEXT_BULK_FALLBACK_TO_DEFAULT).equals("bulk")) { + ctx.put(IDataWriter.CONTEXT_BULK_FALLBACK_TO_DEFAULT, "default"); + processor.setDataReader(buildDataReader(batchInStaging, resource)); + processor.process(ctx); + } else { + isError = true; + throw e; + } } finally { incomingBatch = listener.currentBatch; if (incomingBatch != null) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ManageIncomingBatchListener.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ManageIncomingBatchListener.java index 7181cdf0b9..fa15f512e3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ManageIncomingBatchListener.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ManageIncomingBatchListener.java @@ -39,7 +39,9 @@ import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.io.data.Batch; import org.jumpmind.symmetric.io.data.DataContext; +import org.jumpmind.symmetric.io.data.DataProcessor; import org.jumpmind.symmetric.io.data.IDataProcessorListener; +import org.jumpmind.symmetric.io.data.IDataWriter; import org.jumpmind.symmetric.io.data.writer.Conflict; import org.jumpmind.symmetric.io.data.writer.ConflictException; import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter; @@ -190,129 +192,135 @@ public void batchInError(DataContext context, Throwable ex) { throw ex; } - /* - * Reread batch to make sure it wasn't set to IG or OK - */ - engine.getIncomingBatchService().refreshIncomingBatch(currentBatch); - - Batch batch = context.getBatch(); - isNewErrorForCurrentBatch = batch.getLineCount() != currentBatch.getFailedLineNumber(); - - if (context.getWriter() != null - && context.getReader().getStatistics().get(batch) != null - && context.getWriter().getStatistics().get(batch) != null) { - this.currentBatch.setValues(context.getReader().getStatistics().get(batch), - context.getWriter().getStatistics().get(batch), false); - statisticManager.incrementDataLoaded(this.currentBatch.getChannelId(), - this.currentBatch.getLoadRowCount()); - statisticManager.incrementDataBytesLoaded(this.currentBatch.getChannelId(), - this.currentBatch.getByteCount()); - statisticManager.incrementDataLoadedErrors(this.currentBatch.getChannelId(), 1); + if (context.get(IDataWriter.CONTEXT_BULK_WRITER_TO_USE) != null && context.get(IDataWriter.CONTEXT_BULK_WRITER_TO_USE).equals("bulk")) { + log.info("Bulk loading failed for this batch " + context.getBatch().getBatchId() + ", falling back to default loading."); + log.debug("Bulk loading error.", ex); } else { - log.error("An error caused a batch to fail without attempting to load data for batch " + - (batch != null ? batch.getNodeBatchId() : "?"), ex); - } - - enableSyncTriggers(context); - - if (ex instanceof IOException || ex instanceof TransportException - || ex instanceof IoException) { - log.warn("Failed to load batch " + this.currentBatch.getNodeBatchId(), ex); - this.currentBatch.setSqlMessage(ex.getMessage()); - } else { - log.error(String.format("Failed to load batch %s", this.currentBatch.getNodeBatchId()), ex); - - SQLException se = ExceptionUtils.unwrapSqlException(ex); - if (ex instanceof ConflictException) { - String message = ex.getMessage(); - if (se != null && isNotBlank(se.getMessage())) { - message = message + " " + se.getMessage(); - } - this.currentBatch.setSqlMessage(message); - this.currentBatch.setSqlState(ErrorConstants.CONFLICT_STATE); - this.currentBatch.setSqlCode(ErrorConstants.CONFLICT_CODE); - } else if (se != null) { - String sqlState = se.getSQLState(); - if (sqlState != null && sqlState.length() > 10) { - sqlState = sqlState.replace("JDBC-", ""); - if (sqlState.length() > 10) { - sqlState = sqlState.substring(0, 10); - } - } - this.currentBatch.setSqlState(sqlState); - this.currentBatch.setSqlCode(se.getErrorCode()); - this.currentBatch.setSqlMessage(se.getMessage()); - if (sqlTemplate.isForeignKeyViolation(se)) { - this.currentBatch.setSqlState(ErrorConstants.FK_VIOLATION_STATE); - this.currentBatch.setSqlCode(ErrorConstants.FK_VIOLATION_CODE); - } + + /* + * Reread batch to make sure it wasn't set to IG or OK + */ + engine.getIncomingBatchService().refreshIncomingBatch(currentBatch); + + Batch batch = context.getBatch(); + isNewErrorForCurrentBatch = batch.getLineCount() != currentBatch.getFailedLineNumber(); + + if (context.getWriter() != null + && context.getReader().getStatistics().get(batch) != null + && context.getWriter().getStatistics().get(batch) != null) { + this.currentBatch.setValues(context.getReader().getStatistics().get(batch), + context.getWriter().getStatistics().get(batch), false); + statisticManager.incrementDataLoaded(this.currentBatch.getChannelId(), + this.currentBatch.getLoadRowCount()); + statisticManager.incrementDataBytesLoaded(this.currentBatch.getChannelId(), + this.currentBatch.getByteCount()); + statisticManager.incrementDataLoadedErrors(this.currentBatch.getChannelId(), 1); } else { - this.currentBatch.setSqlMessage(ExceptionUtils.getRootMessage(ex)); + log.error("An error caused a batch to fail without attempting to load data for batch " + + (batch != null ? batch.getNodeBatchId() : "?"), ex); } - - } - - ISqlTransaction transaction = context.findSymmetricTransaction(engine.getTablePrefix()); - - // If we were in the process of skipping or ignoring a batch - // then its status would have been OK. We should not - // set the status to ER. - if (this.currentBatch.getStatus() != Status.OK && - this.currentBatch.getStatus() != Status.IG) { - - this.currentBatch.setStatus(IncomingBatch.Status.ER); - if (context.getTable() != null && context.getData() != null) { - try { - IncomingError error = new IncomingError(); - error.setBatchId(this.currentBatch.getBatchId()); - error.setNodeId(this.currentBatch.getNodeId()); - error.setTargetCatalogName(context.getTable().getCatalog()); - error.setTargetSchemaName(context.getTable().getSchema()); - error.setTargetTableName(context.getTable().getName()); - error.setColumnNames(Table.getCommaDeliminatedColumns(context - .getTable().getColumns())); - error.setPrimaryKeyColumnNames(Table.getCommaDeliminatedColumns(context - .getTable().getPrimaryKeyColumns())); - error.setCsvData(context.getData()); - error.setCurData((String) context.get(DefaultDatabaseWriter.CUR_DATA)); - error.setBinaryEncoding(context.getBatch().getBinaryEncoding()); - error.setEventType(context.getData().getDataEventType()); - error.setFailedLineNumber(this.currentBatch.getFailedLineNumber()); - error.setFailedRowNumber(this.currentBatch.getFailedRowNumber()); - if (ex instanceof ConflictException) { - ConflictException conflictEx = (ConflictException) ex; - Conflict conflict = conflictEx.getConflict(); - if (conflict != null) { - error.setConflictId(conflict.getConflictId()); - } + + enableSyncTriggers(context); + + if (ex instanceof IOException || ex instanceof TransportException + || ex instanceof IoException) { + log.warn("Failed to load batch " + this.currentBatch.getNodeBatchId(), ex); + this.currentBatch.setSqlMessage(ex.getMessage()); + } else { + log.error(String.format("Failed to load batch %s", this.currentBatch.getNodeBatchId()), ex); + + SQLException se = ExceptionUtils.unwrapSqlException(ex); + if (ex instanceof ConflictException) { + String message = ex.getMessage(); + if (se != null && isNotBlank(se.getMessage())) { + message = message + " " + se.getMessage(); } - if (transaction != null) { - dataLoaderService.insertIncomingError(transaction, error); - } else { - dataLoaderService.insertIncomingError(error); + this.currentBatch.setSqlMessage(message); + this.currentBatch.setSqlState(ErrorConstants.CONFLICT_STATE); + this.currentBatch.setSqlCode(ErrorConstants.CONFLICT_CODE); + } else if (se != null) { + String sqlState = se.getSQLState(); + if (sqlState != null && sqlState.length() > 10) { + sqlState = sqlState.replace("JDBC-", ""); + if (sqlState.length() > 10) { + sqlState = sqlState.substring(0, 10); + } } - } catch (UniqueKeyException e) { - // ignore. we already inserted an error for this row - if (transaction != null) { - transaction.rollback(); + this.currentBatch.setSqlState(sqlState); + this.currentBatch.setSqlCode(se.getErrorCode()); + this.currentBatch.setSqlMessage(se.getMessage()); + if (sqlTemplate.isForeignKeyViolation(se)) { + this.currentBatch.setSqlState(ErrorConstants.FK_VIOLATION_STATE); + this.currentBatch.setSqlCode(ErrorConstants.FK_VIOLATION_CODE); } + } else { + this.currentBatch.setSqlMessage(ExceptionUtils.getRootMessage(ex)); } + } - } - - if (transaction != null) { - if (incomingBatchService.isRecordOkBatchesEnabled() - || this.currentBatch.isRetry()) { - incomingBatchService.updateIncomingBatch(transaction, this.currentBatch); - } else { - incomingBatchService.insertIncomingBatch(transaction, this.currentBatch); + + ISqlTransaction transaction = context.findSymmetricTransaction(engine.getTablePrefix()); + + // If we were in the process of skipping or ignoring a batch + // then its status would have been OK. We should not + // set the status to ER. + if (this.currentBatch.getStatus() != Status.OK && + this.currentBatch.getStatus() != Status.IG) { + + this.currentBatch.setStatus(IncomingBatch.Status.ER); + if (context.getTable() != null && context.getData() != null) { + try { + IncomingError error = new IncomingError(); + error.setBatchId(this.currentBatch.getBatchId()); + error.setNodeId(this.currentBatch.getNodeId()); + error.setTargetCatalogName(context.getTable().getCatalog()); + error.setTargetSchemaName(context.getTable().getSchema()); + error.setTargetTableName(context.getTable().getName()); + error.setColumnNames(Table.getCommaDeliminatedColumns(context + .getTable().getColumns())); + error.setPrimaryKeyColumnNames(Table.getCommaDeliminatedColumns(context + .getTable().getPrimaryKeyColumns())); + error.setCsvData(context.getData()); + error.setCurData((String) context.get(DefaultDatabaseWriter.CUR_DATA)); + error.setBinaryEncoding(context.getBatch().getBinaryEncoding()); + error.setEventType(context.getData().getDataEventType()); + error.setFailedLineNumber(this.currentBatch.getFailedLineNumber()); + error.setFailedRowNumber(this.currentBatch.getFailedRowNumber()); + if (ex instanceof ConflictException) { + ConflictException conflictEx = (ConflictException) ex; + Conflict conflict = conflictEx.getConflict(); + if (conflict != null) { + error.setConflictId(conflict.getConflictId()); + } + } + if (transaction != null) { + dataLoaderService.insertIncomingError(transaction, error); + } else { + dataLoaderService.insertIncomingError(error); + } + } catch (UniqueKeyException e) { + // ignore. we already inserted an error for this row + if (transaction != null) { + transaction.rollback(); + } + } + } } - } else { - if (incomingBatchService.isRecordOkBatchesEnabled() - || this.currentBatch.isRetry()) { - incomingBatchService.updateIncomingBatch(this.currentBatch); + + if (transaction != null) { + if (incomingBatchService.isRecordOkBatchesEnabled() + || this.currentBatch.isRetry()) { + incomingBatchService.updateIncomingBatch(transaction, this.currentBatch); + } else { + incomingBatchService.insertIncomingBatch(transaction, this.currentBatch); + } } else { - incomingBatchService.insertIncomingBatch(this.currentBatch); + if (incomingBatchService.isRecordOkBatchesEnabled() + || this.currentBatch.isRetry()) { + incomingBatchService.updateIncomingBatch(this.currentBatch); + } else { + incomingBatchService.insertIncomingBatch(this.currentBatch); + } } } } catch (Throwable e) { diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/IDataWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/IDataWriter.java index 83ddaff0ea..0eb131953d 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/IDataWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/IDataWriter.java @@ -25,6 +25,8 @@ public interface IDataWriter extends IDataResource { + public static final String CONTEXT_BULK_WRITER_TO_USE = "bulkWriterToUse"; + public void start(Batch batch); public boolean start(Table table); @@ -34,5 +36,7 @@ public interface IDataWriter extends IDataResource { public void end(Table table); public void end(Batch batch, boolean inError); + + }