Skip to content

Commit

Permalink
0002805: mysql_bulk may cause NullPointerException
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Sep 20, 2016
1 parent 55a3c22 commit f573bd4
Showing 1 changed file with 16 additions and 29 deletions.
Expand Up @@ -25,7 +25,6 @@
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.JdbcUtils;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.MySqlBulkDatabaseWriter;
Expand All @@ -37,48 +36,36 @@
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.load.IDataLoaderFactory;
import org.jumpmind.symmetric.service.IParameterService;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

public class MySqlBulkDataLoaderFactory implements IDataLoaderFactory {

private int maxRowsBeforeFlush;
private long maxBytesBeforeFlush;
private boolean isLocal;
private boolean isReplace;
private IStagingManager stagingManager;
private NativeJdbcExtractor jdbcExtractor;
private IStagingManager stagingManager;
private ISymmetricEngine engine;

private IParameterService parameterService;

public MySqlBulkDataLoaderFactory(ISymmetricEngine engine) {
this.stagingManager = engine.getStagingManager();
this.jdbcExtractor = JdbcUtils.getNativeJdbcExtractory();
this.engine = engine;
this.parameterService = engine.getParameterService();
}

public String getTypeName() {
return "mysql_bulk";
}

public IDataWriter getDataWriter(String sourceNodeId,
ISymmetricDialect symmetricDialect,
TransformWriter transformWriter,
List<IDatabaseWriterFilter> filters,
List<IDatabaseWriterErrorHandler> errorHandlers,
List<? extends Conflict> conflictSettings,
List<ResolvedData> resolvedData) {
return new MySqlBulkDatabaseWriter(symmetricDialect.getPlatform(),
stagingManager, jdbcExtractor, maxRowsBeforeFlush, maxBytesBeforeFlush, isLocal, isReplace);
}
public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetricDialect, TransformWriter transformWriter,
List<IDatabaseWriterFilter> filters, List<IDatabaseWriterErrorHandler> errorHandlers,
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {

int maxRowsBeforeFlush = parameterService.getInt("mysql.bulk.load.max.rows.before.flush", 100000);
long maxBytesBeforeFlush = parameterService.getLong("mysql.bulk.load.max.bytes.before.flush", 1000000000);
boolean isLocal = Boolean.parseBoolean(parameterService.getString("mysql.bulk.load.local", "true"));
boolean isReplace = Boolean.parseBoolean(parameterService.getString("mysql.bulk.load.replace", "false"));

public void setSymmetricEngine(ISymmetricEngine engine) {
this.maxRowsBeforeFlush = engine.getParameterService().getInt(
"mysql.bulk.load.max.rows.before.flush", 100000);
this.maxBytesBeforeFlush = engine.getParameterService().getLong(
"mysql.bulk.load.max.bytes.before.flush", 1000000000);
this.isLocal = Boolean.parseBoolean(engine.getParameterService().getString(
"mysql.bulk.load.local", "true"));
this.isReplace = Boolean.parseBoolean(engine.getParameterService().getString(
"mysql.bulk.load.replace", "false"));
this.stagingManager = engine.getStagingManager();
return new MySqlBulkDatabaseWriter(symmetricDialect.getPlatform(), stagingManager, jdbcExtractor, maxRowsBeforeFlush,
maxBytesBeforeFlush, isLocal, isReplace);
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
Expand Down

0 comments on commit f573bd4

Please sign in to comment.