Skip to content

Commit

Permalink
0003710: Jdbc Batch bulk data loader
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Sep 6, 2018
1 parent 208106c commit 8201c38
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 19 deletions.
Expand Up @@ -369,6 +369,7 @@ protected static SqlTemplateSettings createSqlTemplateSettings(TypedProperties p
settings.setFetchSize(properties.getInt(ParameterConstants.DB_FETCH_SIZE, 1000));
settings.setQueryTimeout(properties.getInt(ParameterConstants.DB_QUERY_TIMEOUT_SECS, 300));
settings.setBatchSize(properties.getInt(ParameterConstants.JDBC_EXECUTE_BATCH_SIZE, 100));
settings.setBatchBulkLoaderSize(properties.getInt(ParameterConstants.JDBC_EXECUTE_BULK_BATCH_SIZE, 25));
settings.setOverrideIsolationLevel(properties.getInt(ParameterConstants.JDBC_ISOLATION_LEVEL, -1));
settings.setReadStringsAsBytes(properties.is(ParameterConstants.JDBC_READ_STRINGS_AS_BYTES, false));
settings.setTreatBinaryAsLob(properties.is(ParameterConstants.TREAT_BINARY_AS_LOB_ENABLED, true));
Expand Down
Expand Up @@ -8,14 +8,15 @@
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.JdbcBatchBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.load.DefaultDataLoaderFactory;
import org.jumpmind.symmetric.load.IDataLoaderFactory;

public class BulkDataLoaderFactory implements IDataLoaderFactory, ISymmetricEngineAware, IBuiltInExtensionPoint {
Expand All @@ -37,7 +38,9 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
dataLoaderFactories.put(factory.getTypeName(), factory);
}

if (DatabaseNamesConstants.MYSQL.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
if (engine.getParameterService().is(ParameterConstants.JDBC_EXECUTE_BULK_BATCH_OVERRIDE, false)) {
return new JdbcBatchBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix());
} else if (DatabaseNamesConstants.MYSQL.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
return new MySqlBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else if (DatabaseNamesConstants.MSSQL2000.equals(engine.getSymmetricDialect().getTargetPlatform().getName())
Expand All @@ -60,25 +63,13 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
return new TeradataBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else {
return dataLoaderFactories.get(new DefaultDataLoaderFactory().getTypeName()).getDataWriter(sourceNodeId,
symmetricDialect, transformWriter, filters, errorHandlers, conflictSettings, resolvedData);
}
return new JdbcBatchBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix());
}
}

@Override
public boolean isPlatformSupported(IDatabasePlatform platform) {
if (DatabaseNamesConstants.MYSQL.equals(platform.getName())
|| DatabaseNamesConstants.MSSQL2000.equals(platform.getName())
|| DatabaseNamesConstants.MSSQL2005.equals(platform.getName())
|| DatabaseNamesConstants.MSSQL2008.equals(platform.getName())
|| DatabaseNamesConstants.ORACLE.equals(platform.getName())
|| DatabaseNamesConstants.POSTGRESQL.equals(platform.getName())
|| DatabaseNamesConstants.GREENPLUM.equals(platform.getName())
|| DatabaseNamesConstants.REDSHIFT.equals(platform.getName())
|| (platform.getName() != null && platform.getName().startsWith(DatabaseNamesConstants.TERADATA))) {
return true;
}
return false;
return true;
}

@Override
Expand Down
Expand Up @@ -2,6 +2,7 @@

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.symmetric.common.ContextConstants;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DynamicDefaultDatabaseWriter;
Expand All @@ -17,15 +18,37 @@ public AbstractBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabase
super(symmetricPlatform, targetPlatform, tablePrefix, settings);
}

@Override
public void start(Batch batch) {
super.start(batch);
if (isFallBackToDefault()) {
log.debug("Writing batch " + batch.getBatchId() + " on channel " + batch.getChannelId() + " to node " + batch.getTargetNodeId() + " using DEFAULT loader");
}else{
log.debug("Writing batch " + batch.getBatchId() + " on channel " + batch.getChannelId() + " to node " + batch.getTargetNodeId() + " using BULK loader");
}
}

public final void write(CsvData data) {
if (context.get(ContextConstants.CONTEXT_BULK_WRITER_TO_USE) != null && context.get(ContextConstants.CONTEXT_BULK_WRITER_TO_USE).equals("default")) {
if (isFallBackToDefault()) {
writeDefault(data);
}else{
context.put(ContextConstants.CONTEXT_BULK_WRITER_TO_USE, "bulk");
bulkWrite(data);
}
}

@Override
public void end(Batch batch, boolean inError) {
super.end(batch, inError);
if (!inError) {
context.put(ContextConstants.CONTEXT_BULK_WRITER_TO_USE, null);
}
}

public boolean isFallBackToDefault() {
return context.get(ContextConstants.CONTEXT_BULK_WRITER_TO_USE) != null && context.get(ContextConstants.CONTEXT_BULK_WRITER_TO_USE).equals("default");
}

protected final void writeDefault(CsvData data) {
super.write(data);
}
Expand Down
@@ -0,0 +1,55 @@
package org.jumpmind.symmetric.io;

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.JdbcSqlTemplate;
import org.jumpmind.db.sql.JdbcSqlTransaction;
import org.jumpmind.symmetric.common.ContextConstants;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;

public class JdbcBatchBulkDatabaseWriter extends AbstractBulkDatabaseWriter {

public JdbcBatchBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform, String tablePrefix) {
super(symmetricPlatform, targetPlatform, tablePrefix);
}

@Override
public void start(Batch batch) {
super.start(batch);
if (context.get(ContextConstants.CONTEXT_BULK_WRITER_TO_USE) == null || !context.get(ContextConstants.CONTEXT_BULK_WRITER_TO_USE).equals("default")) {
getTransaction().setInBatchMode(true);
((JdbcSqlTransaction) getTransaction()).setBatchSize(((JdbcSqlTemplate) getPlatform()
.getSqlTemplate()).getSettings().getBatchBulkLoaderSize());
}
}

@Override
protected LoadStatus insert(CsvData data) {
LoadStatus loadStatus = super.insert(data);
if (loadStatus == LoadStatus.CONFLICT) {
loadStatus = LoadStatus.SUCCESS;
}
return loadStatus;
}

@Override
protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useConflictDetection) {
LoadStatus loadStatus = super.insert(data);
if (loadStatus == LoadStatus.CONFLICT) {
loadStatus = LoadStatus.SUCCESS;
}
return loadStatus;
}

@Override
protected void bulkWrite(CsvData data) {
writeDefault(data);
}

@Override
public void end(Batch batch, boolean inError) {
super.end(batch, inError);
getTransaction().flush();
}

}
Expand Up @@ -48,6 +48,8 @@ private ParameterConstants() {
public final static String AUTO_START_ENGINE = "auto.start.engine";

public final static String JDBC_EXECUTE_BATCH_SIZE = "db.jdbc.execute.batch.size";
public final static String JDBC_EXECUTE_BULK_BATCH_SIZE = "db.jdbc.bulk.execute.batch.size";
public final static String JDBC_EXECUTE_BULK_BATCH_OVERRIDE = "db.jdbc.bulk.execute.batch.override";
public final static String JDBC_READ_STRINGS_AS_BYTES = "db.read.strings.as.bytes";
public final static String JDBC_ISOLATION_LEVEL = "db.jdbc.isolation.level";

Expand Down
Expand Up @@ -1041,6 +1041,7 @@ protected IDataWriter chooseDataWriter(Batch batch) {
} catch (Exception e) {
if (ctx.get(ContextConstants.CONTEXT_BULK_WRITER_TO_USE) != null && ctx.get(ContextConstants.CONTEXT_BULK_WRITER_TO_USE).equals("bulk")) {
ctx.put(ContextConstants.CONTEXT_BULK_WRITER_TO_USE, "default");
listener.currentBatch.setStatus(Status.OK);
processor.setDataReader(buildDataReader(batchInStaging, resource));
processor.process(ctx);
} else {
Expand Down
19 changes: 19 additions & 0 deletions symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -183,6 +183,25 @@ db.jdbc.streaming.results.fetch.size=100
# Tags: database,routing
db.jdbc.execute.batch.size=100

# This override any database specific bulk loader if the channel data loader algorithm
# is set to bulk and a specific bulk loader is available. For example is a PostgreSQL
# database is setup for bulk loading and this parameter is true the JdbcBatchBulkDatabaseWriter
# will be used instead of the PostgreSQL bulk loader. If a dialect does not have a designated
# bulk loader and the channel is set for bulk loading this will be used automatically even if the
# the parameter is false. Essentially the parameter only controls "overriding" a dialect specific
# bulk loader.
#
# Tags: database,load
# Type: boolean
db.jdbc.bulk.execute.batch.override=false

# This is the default number of rows that will be sent to the database as a batch when
# SymmetricDS uses the JDBC batch API for bulk data load types. Designed for loads that are setup
# with a channel data loader algorithm of batch.
#
# Tags: database,load
db.jdbc.bulk.execute.batch.size=25

# Indicates that case should be ignored when looking up references to tables using the database's metadata api.
#
# Tags: database
Expand Down
Expand Up @@ -25,6 +25,7 @@ public class SqlTemplateSettings {
protected int fetchSize = 1000;
protected int queryTimeout;
protected int batchSize = 100;
protected int batchBulkLoaderSize = 25;
protected boolean readStringsAsBytes;
protected boolean treatBinaryAsLob;
protected boolean rightTrimCharValues;
Expand Down Expand Up @@ -116,4 +117,14 @@ public void setAllowUpdatesWithResults(boolean allowUpdatesWithResults) {
this.allowUpdatesWithResults = allowUpdatesWithResults;
}

public int getBatchBulkLoaderSize() {
return batchBulkLoaderSize;
}

public void setBatchBulkLoaderSize(int batchBulkLoaderSize) {
this.batchBulkLoaderSize = batchBulkLoaderSize;
}



}
Expand Up @@ -66,6 +66,8 @@ public class JdbcSqlTransaction implements ISqlTransaction {

protected List<ISqlTransactionListener> listeners = new ArrayList<ISqlTransactionListener>();

protected int batchSize = 100;

public JdbcSqlTransaction(JdbcSqlTemplate jdbcSqlTemplate) {
this(jdbcSqlTemplate, false);
}
Expand All @@ -74,6 +76,7 @@ public JdbcSqlTransaction(JdbcSqlTemplate jdbcSqlTemplate, boolean autoCommit) {
this.autoCommit = autoCommit;
this.jdbcSqlTemplate = jdbcSqlTemplate;
this.logSqlBuilder = jdbcSqlTemplate.logSqlBuilder;
this.batchSize = jdbcSqlTemplate.getSettings().getBatchSize();
this.init();
}

Expand Down Expand Up @@ -453,7 +456,7 @@ public int addRow(Object marker, Object[] args, int[] argTypes) {
long end = System.currentTimeMillis();
logSqlBuilder.logSql(log, "addBatch()", psql, args, argTypes, (end-start));

if (markers.size() >= jdbcSqlTemplate.getSettings().getBatchSize()) {
if (markers.size() >= this.batchSize) {
rowsUpdated = flush();
}
} else {
Expand All @@ -465,6 +468,14 @@ public int addRow(Object marker, Object[] args, int[] argTypes) {
return rowsUpdated;
}

public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

public int getBatchSize() {
return this.batchSize;
}

protected int executePreparedUpdate(PreparedStatement preparedStatement, String sql, Object[] args, int[] argTypes) throws SQLException {
int rowsUpdated = 0;
long start = System.currentTimeMillis();
Expand Down

0 comments on commit 8201c38

Please sign in to comment.