Skip to content

Commit

Permalink
0003914: Some bulk loaders were not using parameters properly during a
Browse files Browse the repository at this point in the history
fall back to a default loader
  • Loading branch information
jumpmind-josh committed Apr 12, 2019
1 parent b01e129 commit e172551
Show file tree
Hide file tree
Showing 18 changed files with 96 additions and 64 deletions.
Expand Up @@ -17,9 +17,10 @@
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.load.AbstractDataLoaderFactory;
import org.jumpmind.symmetric.load.IDataLoaderFactory;

public class BulkDataLoaderFactory implements IDataLoaderFactory, ISymmetricEngineAware, IBuiltInExtensionPoint {
public class BulkDataLoaderFactory extends AbstractDataLoaderFactory implements IDataLoaderFactory, ISymmetricEngineAware, IBuiltInExtensionPoint {

ISymmetricEngine engine;
Map<String, IDataLoaderFactory> dataLoaderFactories = new HashMap<String, IDataLoaderFactory>();
Expand All @@ -39,7 +40,8 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
}

if (engine.getParameterService().is(ParameterConstants.JDBC_EXECUTE_BULK_BATCH_OVERRIDE, false)) {
return new JdbcBatchBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix());
return new JdbcBatchBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
symmetricDialect.getTablePrefix(), buildParameterDatabaseWritterSettings());
} else if (DatabaseNamesConstants.MYSQL.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
return new MySqlBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
Expand Down Expand Up @@ -71,7 +73,8 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
return new SnowflakeBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else {
return new JdbcBatchBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix());
return new JdbcBatchBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
symmetricDialect.getTablePrefix(), buildParameterDatabaseWritterSettings());
}
}

Expand Down
Expand Up @@ -22,6 +22,7 @@

import java.util.List;

import org.apache.bcel.generic.D2F;
import org.apache.commons.lang.StringEscapeUtils;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IDatabasePlatform;
Expand All @@ -31,21 +32,22 @@
import org.jumpmind.symmetric.io.MsSqlBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.load.AbstractDataLoaderFactory;
import org.jumpmind.symmetric.load.IDataLoaderFactory;
import org.jumpmind.symmetric.service.IParameterService;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

public class MsSqlBulkDataLoaderFactory implements IDataLoaderFactory {
public class MsSqlBulkDataLoaderFactory extends AbstractDataLoaderFactory implements IDataLoaderFactory {

private NativeJdbcExtractor jdbcExtractor;
private IStagingManager stagingManager;
private IParameterService parameterService;


public MsSqlBulkDataLoaderFactory(ISymmetricEngine engine) {
this.jdbcExtractor = JdbcUtils.getNativeJdbcExtractory();
this.stagingManager = engine.getStagingManager();
Expand All @@ -71,13 +73,15 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri

return new MsSqlBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix(),
stagingManager, jdbcExtractor, maxRowsBeforeFlush,
fireTriggers, uncPath, fieldTerminator, rowTerminator);
fireTriggers, uncPath, fieldTerminator, rowTerminator, buildParameterDatabaseWritterSettings());
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
return (DatabaseNamesConstants.MSSQL2000.equals(platform.getName())
|| DatabaseNamesConstants.MSSQL2005.equals(platform.getName()) || DatabaseNamesConstants.MSSQL2008
.equals(platform.getName()));
}



}
Expand Up @@ -35,16 +35,16 @@
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.load.AbstractDataLoaderFactory;
import org.jumpmind.symmetric.load.IDataLoaderFactory;
import org.jumpmind.symmetric.service.IParameterService;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

public class MySqlBulkDataLoaderFactory implements IDataLoaderFactory {
public class MySqlBulkDataLoaderFactory extends AbstractDataLoaderFactory implements IDataLoaderFactory {

private IStagingManager stagingManager;
private NativeJdbcExtractor jdbcExtractor;
private IParameterService parameterService;


public MySqlBulkDataLoaderFactory(ISymmetricEngine engine) {
this.stagingManager = engine.getStagingManager();
this.jdbcExtractor = JdbcUtils.getNativeJdbcExtractory();
Expand All @@ -67,7 +67,7 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri

return new MySqlBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix(),
stagingManager, jdbcExtractor, maxRowsBeforeFlush,
maxBytesBeforeFlush, isLocal, isReplace);
maxBytesBeforeFlush, isLocal, isReplace, buildParameterDatabaseWritterSettings());
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
Expand Down
Expand Up @@ -20,7 +20,6 @@
*/
package org.jumpmind.symmetric.ext;

import java.lang.reflect.Constructor;
import java.util.List;

import org.jumpmind.db.platform.DatabaseNamesConstants;
Expand All @@ -35,16 +34,15 @@
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.load.AbstractDataLoaderFactory;
import org.jumpmind.symmetric.load.IDataLoaderFactory;
import org.jumpmind.symmetric.service.IParameterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedshiftBulkDataLoaderFactory implements IDataLoaderFactory {
public class RedshiftBulkDataLoaderFactory extends AbstractDataLoaderFactory implements IDataLoaderFactory {

protected final Logger log = LoggerFactory.getLogger(getClass());

private IParameterService parameterService;
private IStagingManager stagingManager;

public RedshiftBulkDataLoaderFactory(ISymmetricEngine engine) {
Expand All @@ -62,7 +60,7 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {

return new RedshiftBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
symmetricDialect.getTablePrefix(), stagingManager, filters, errorHandlers, parameterService);
symmetricDialect.getTablePrefix(), stagingManager, filters, errorHandlers, parameterService, buildParameterDatabaseWritterSettings());

}

Expand Down
Expand Up @@ -4,7 +4,6 @@

import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.JdbcUtils;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.SnowflakeBulkDatabaseWriter;
Expand All @@ -15,15 +14,14 @@
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.load.AbstractDataLoaderFactory;
import org.jumpmind.symmetric.load.IDataLoaderFactory;
import org.jumpmind.symmetric.service.IParameterService;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

public class SnowflakeBulkDataLoaderFactory implements IDataLoaderFactory {
public class SnowflakeBulkDataLoaderFactory extends AbstractDataLoaderFactory implements IDataLoaderFactory {

private IStagingManager stagingManager;
private IParameterService parameterService;


public SnowflakeBulkDataLoaderFactory(ISymmetricEngine engine) {
this.stagingManager = engine.getStagingManager();
this.parameterService = engine.getParameterService();
Expand All @@ -39,7 +37,8 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {

return new SnowflakeBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
symmetricDialect.getTablePrefix(), stagingManager, filters, errorHandlers, parameterService);
symmetricDialect.getTablePrefix(), stagingManager, filters, errorHandlers, parameterService,
buildParameterDatabaseWritterSettings());
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
Expand Down
Expand Up @@ -14,9 +14,10 @@
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.load.AbstractDataLoaderFactory;
import org.jumpmind.symmetric.load.IDataLoaderFactory;

public class TeradataBulkDataLoaderFactory implements IDataLoaderFactory {
public class TeradataBulkDataLoaderFactory extends AbstractDataLoaderFactory implements IDataLoaderFactory {
private IStagingManager stagingManager;

public TeradataBulkDataLoaderFactory(ISymmetricEngine engine) {
Expand All @@ -33,7 +34,7 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {

return new TeradataBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix(),
stagingManager);
stagingManager, buildParameterDatabaseWritterSettings());
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.stage.IStagedResource;
Expand Down Expand Up @@ -82,9 +83,10 @@ public abstract class CloudBulkDatabaseWriter extends AbstractBulkDatabaseWriter

public CloudBulkDatabaseWriter(IDatabasePlatform symmetricPlatform,
IDatabasePlatform targetPlatform, String tablePrefix, IStagingManager stagingManager, List<IDatabaseWriterFilter> filters,
List<IDatabaseWriterErrorHandler> errorHandlers, IParameterService parameterService) {
List<IDatabaseWriterErrorHandler> errorHandlers, IParameterService parameterService, DatabaseWriterSettings settings) {
super(symmetricPlatform, targetPlatform, tablePrefix);
this.stagingManager = stagingManager;
this.writerSettings = settings;
this.writerSettings.setDatabaseWriterFilters(filters);
this.writerSettings.setDatabaseWriterErrorHandlers(errorHandlers);
this.writerSettings.setCreateTableFailOnError(false);
Expand Down
Expand Up @@ -6,11 +6,13 @@
import org.jumpmind.symmetric.common.ContextConstants;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;

public class JdbcBatchBulkDatabaseWriter extends AbstractBulkDatabaseWriter {

public JdbcBatchBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform, String tablePrefix) {
super(symmetricPlatform, targetPlatform, tablePrefix);
public JdbcBatchBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform,
String tablePrefix, DatabaseWriterSettings writerSettings) {
super(symmetricPlatform, targetPlatform, tablePrefix, writerSettings);
}

@Override
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;
Expand All @@ -63,8 +64,9 @@ public class MsSqlBulkDatabaseWriter extends AbstractBulkDatabaseWriter {
public MsSqlBulkDatabaseWriter(IDatabasePlatform symmetricPlatform,
IDatabasePlatform tar, String tablePrefix,
IStagingManager stagingManager, NativeJdbcExtractor jdbcExtractor,
int maxRowsBeforeFlush, boolean fireTriggers, String uncPath, String fieldTerminator, String rowTerminator) {
super(symmetricPlatform, tar, tablePrefix);
int maxRowsBeforeFlush, boolean fireTriggers, String uncPath, String fieldTerminator, String rowTerminator,
DatabaseWriterSettings writerSettings) {
super(symmetricPlatform, tar, tablePrefix, writerSettings);
this.jdbcExtractor = jdbcExtractor;
this.maxRowsBeforeFlush = maxRowsBeforeFlush;
this.stagingManager = stagingManager;
Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.jumpmind.symmetric.io.data.CsvUtils;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;
Expand All @@ -67,8 +68,8 @@ public class MySqlBulkDatabaseWriter extends AbstractBulkDatabaseWriter {
public MySqlBulkDatabaseWriter(IDatabasePlatform symmetricPlatform,
IDatabasePlatform targetPlatform, String tablePrefix,
IStagingManager stagingManager, NativeJdbcExtractor jdbcExtractor,
int maxRowsBeforeFlush, long maxBytesBeforeFlush, boolean isLocal, boolean isReplace) {
super(symmetricPlatform, targetPlatform, tablePrefix);
int maxRowsBeforeFlush, long maxBytesBeforeFlush, boolean isLocal, boolean isReplace, DatabaseWriterSettings settings) {
super(symmetricPlatform, targetPlatform, tablePrefix, settings);
this.jdbcExtractor = jdbcExtractor;
this.maxRowsBeforeFlush = maxRowsBeforeFlush;
this.maxBytesBeforeFlush = maxBytesBeforeFlush;
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.JdbcSqlTransaction;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.stage.IStagedResource;
Expand All @@ -49,9 +50,9 @@ public class RedshiftBulkDatabaseWriter extends CloudBulkDatabaseWriter {

public RedshiftBulkDatabaseWriter(IDatabasePlatform symmetricPlatform,
IDatabasePlatform targetPlatform, String tablePrefix, IStagingManager stagingManager, List<IDatabaseWriterFilter> filters,
List<IDatabaseWriterErrorHandler> errorHandlers, IParameterService parameterService) {
List<IDatabaseWriterErrorHandler> errorHandlers, IParameterService parameterService, DatabaseWriterSettings settings) {

super(symmetricPlatform, targetPlatform, tablePrefix, stagingManager, filters, errorHandlers, parameterService);
super(symmetricPlatform, targetPlatform, tablePrefix, stagingManager, filters, errorHandlers, parameterService, settings);

this.appendToCopyCommand = parameterService.getString("redshift.append.to.copy.command");

Expand Down
Expand Up @@ -8,6 +8,7 @@
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.JdbcSqlTransaction;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.stage.IStagingManager;
Expand All @@ -27,8 +28,8 @@ public class SnowflakeBulkDatabaseWriter extends CloudBulkDatabaseWriter {

public SnowflakeBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform,
String tablePrefix, IStagingManager stagingManager, List<IDatabaseWriterFilter> filters,
List<IDatabaseWriterErrorHandler> errorHandlers, IParameterService parameterService) {
super(symmetricPlatform, targetPlatform, tablePrefix, stagingManager, filters, errorHandlers, parameterService);
List<IDatabaseWriterErrorHandler> errorHandlers, IParameterService parameterService, DatabaseWriterSettings writerSettings) {
super(symmetricPlatform, targetPlatform, tablePrefix, stagingManager, filters, errorHandlers, parameterService, writerSettings);

this.internalStage = parameterService.getString("snowflake.internal.stage.name", "symmetricds_stage");
this.stagingType = parameterService.getString(ParameterConstants.SNOWFLAKE_STAGING_TYPE, STAGING_TYPE_SNOWFLAKE_INTERNAL);
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagingManager;

Expand All @@ -43,9 +44,10 @@ public class TeradataBulkDatabaseWriter extends AbstractBulkDatabaseWriter {

public TeradataBulkDatabaseWriter(IDatabasePlatform symmetricPlatform,
IDatabasePlatform tar, String tablePrefix,
IStagingManager stagingManager) {
IStagingManager stagingManager, DatabaseWriterSettings settings) {
super(symmetricPlatform, tar, tablePrefix);
this.stagingManager = stagingManager;
this.writerSettings = settings;
}

public boolean start(Table table) {
Expand Down

0 comments on commit e172551

Please sign in to comment.