Skip to content

Commit

Permalink
0002399: Oracle bulk loader should respect
Browse files Browse the repository at this point in the history
dataloader.max.rows.before.commit and execute database writer filters
  • Loading branch information
chenson42 committed Sep 25, 2015
1 parent aff9acb commit aabcafe
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 11 deletions.
Expand Up @@ -28,18 +28,17 @@
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
import org.jumpmind.symmetric.io.OracleBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.load.IDataLoaderFactory;
import org.jumpmind.symmetric.load.DefaultDataLoaderFactory;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

public class OracleBulkDataLoaderFactory implements IDataLoaderFactory, ISymmetricEngineAware,
public class OracleBulkDataLoaderFactory extends DefaultDataLoaderFactory implements ISymmetricEngineAware,
IBuiltInExtensionPoint {

private ISymmetricEngine engine;
Expand All @@ -60,11 +59,12 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
int maxRowsBeforeFlush = engine.getParameterService().getInt(
"oracle.bulk.load.max.rows.before.flush", 1000);
return new OracleBulkDatabaseWriter(symmetricDialect.getPlatform(), engine.getTablePrefix(),
jdbcExtractor, maxRowsBeforeFlush);
jdbcExtractor, maxRowsBeforeFlush, buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData));
}

public void setSymmetricEngine(ISymmetricEngine engine) {
this.engine = engine;
this.parameterService = engine.getParameterService();
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.springframework.jdbc.support.nativejdbc.NativeJdbcExtractor;

Expand All @@ -58,8 +59,8 @@ public class OracleBulkDatabaseWriter extends DefaultDatabaseWriter {
protected List<List<Object>> rowArrays = new ArrayList<List<Object>>();

public OracleBulkDatabaseWriter(IDatabasePlatform platform, String procedurePrefix,
NativeJdbcExtractor jdbcExtractor, int maxRowsBeforeFlush) {
super(platform);
NativeJdbcExtractor jdbcExtractor, int maxRowsBeforeFlush, DatabaseWriterSettings settings) {
super(platform, settings);
this.procedurePrefix = procedurePrefix;
this.jdbcExtractor = jdbcExtractor;
this.maxRowsBeforeFlush = maxRowsBeforeFlush;
Expand Down
Expand Up @@ -23,20 +23,18 @@
import java.util.ArrayList;
import java.util.List;

import junit.framework.Assert;

import org.jumpmind.db.DbTestUtils;
import org.jumpmind.db.platform.oracle.OracleDatabasePlatform;
import org.jumpmind.db.util.BasicDataSourcePropertyConstants;
import org.jumpmind.symmetric.io.OracleBulkDatabaseWriter;
import org.jumpmind.symmetric.io.AbstractWriterTest.TableCsvData;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor;


public class OracleBulkDatabaseWriterTest extends AbstractWriterTest {

@BeforeClass
Expand All @@ -58,7 +56,7 @@ public void setupTest() {
@Override
protected long writeData(TableCsvData... datas) {
return writeData(new OracleBulkDatabaseWriter(platform, "sym",
new CommonsDbcpNativeJdbcExtractor(), 1000), datas);
new CommonsDbcpNativeJdbcExtractor(), 1000, null), datas);
}

@Override
Expand Down

0 comments on commit aabcafe

Please sign in to comment.