Skip to content

Commit

Permalink
0002512 - Prevent duplicate incoming batch inserts in Redshift when
Browse files Browse the repository at this point in the history
retry occurs
  • Loading branch information
Hicks, Josh committed Mar 1, 2016
1 parent eda0ff1 commit 870a0fc
Showing 1 changed file with 31 additions and 20 deletions.
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.platform.redshift.RedshiftDatabasePlatform;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
Expand All @@ -44,6 +45,8 @@
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.util.FormatUtils;

import com.oracle.xmlns.internal.webservices.jaxws_databinding.ExistingAnnotationsType;

/**
* @see IIncomingBatchService
*/
Expand Down Expand Up @@ -241,26 +244,34 @@ public boolean acquireIncomingBatch(IncomingBatch batch) {

public void insertIncomingBatch(ISqlTransaction transaction, IncomingBatch batch) {
if (batch.isPersistable()) {
batch.setLastUpdatedHostName(clusterService.getServerId());
batch.setLastUpdatedTime(new Date());
transaction.prepareAndExecute(
getSql("insertIncomingBatchSql"),
new Object[] { batch.getBatchId(), batch.getNodeId(), batch.getChannelId(),
batch.getStatus().name(), batch.getNetworkMillis(),
batch.getFilterMillis(), batch.getDatabaseMillis(),
batch.getFailedRowNumber(), batch.getFailedLineNumber(),
batch.getByteCount(), batch.getStatementCount(),
batch.getFallbackInsertCount(), batch.getFallbackUpdateCount(),
batch.getIgnoreCount(), batch.getMissingDeleteCount(),
batch.getSkipCount(), batch.getSqlState(), batch.getSqlCode(),
FormatUtils.abbreviateForLogging(batch.getSqlMessage()),
batch.getLastUpdatedHostName(), batch.getLastUpdatedTime() },
new int[] { Types.NUMERIC, Types.VARCHAR, Types.VARCHAR, Types.CHAR,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.VARCHAR,
Types.TIMESTAMP });
boolean alreadyExists = false;
if (symmetricDialect.getPlatform() instanceof RedshiftDatabasePlatform) {
if (findIncomingBatch(batch.getBatchId(), batch.getNodeId()) != null) {
alreadyExists = true;
}
}
if (!alreadyExists) {
batch.setLastUpdatedHostName(clusterService.getServerId());
batch.setLastUpdatedTime(new Date());
transaction.prepareAndExecute(
getSql("insertIncomingBatchSql"),
new Object[] { batch.getBatchId(), batch.getNodeId(), batch.getChannelId(),
batch.getStatus().name(), batch.getNetworkMillis(),
batch.getFilterMillis(), batch.getDatabaseMillis(),
batch.getFailedRowNumber(), batch.getFailedLineNumber(),
batch.getByteCount(), batch.getStatementCount(),
batch.getFallbackInsertCount(), batch.getFallbackUpdateCount(),
batch.getIgnoreCount(), batch.getMissingDeleteCount(),
batch.getSkipCount(), batch.getSqlState(), batch.getSqlCode(),
FormatUtils.abbreviateForLogging(batch.getSqlMessage()),
batch.getLastUpdatedHostName(), batch.getLastUpdatedTime() },
new int[] { Types.NUMERIC, Types.VARCHAR, Types.VARCHAR, Types.CHAR,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.VARCHAR, Types.NUMERIC, Types.VARCHAR, Types.VARCHAR,
Types.TIMESTAMP });
}
}
}

Expand Down

0 comments on commit 870a0fc

Please sign in to comment.