Skip to content
Permalink
Browse files

0003825: Oracle bulk loader using SQL*Loader sqlldr

  • Loading branch information...
elong
elong committed Dec 11, 2018
1 parent 31fd1a7 commit ece964619ffe5d11379b27ab31a161415670c428
@@ -24,7 +24,10 @@

import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.util.BasicDataSourcePropertyConstants;
import org.jumpmind.security.SecurityConstants;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.OracleBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter;
@@ -34,6 +37,7 @@
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.load.DefaultDataLoaderFactory;
import org.jumpmind.symmetric.service.IParameterService;

public class OracleBulkDataLoaderFactory extends DefaultDataLoaderFactory {

@@ -52,7 +56,27 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
TransformWriter transformWriter, List<IDatabaseWriterFilter> filters,
List<IDatabaseWriterErrorHandler> errorHandlers,
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {
return new OracleBulkDatabaseWriter(engine,

IParameterService parmService = engine.getParameterService();
String dbUrl = parmService.getString(BasicDataSourcePropertyConstants.DB_POOL_URL);
String dbUser = parmService.getString(BasicDataSourcePropertyConstants.DB_POOL_USER);
if (dbUser != null && dbUser.startsWith(SecurityConstants.PREFIX_ENC)) {
dbUser = engine.getSecurityService().decrypt(dbUser.substring(SecurityConstants.PREFIX_ENC.length()));
}

String dbPassword = parmService.getString(BasicDataSourcePropertyConstants.DB_POOL_PASSWORD);
if (dbPassword != null && dbPassword.startsWith(SecurityConstants.PREFIX_ENC)) {
dbPassword = engine.getSecurityService().decrypt(dbPassword.substring(SecurityConstants.PREFIX_ENC.length()));
}

String sqlLoaderCommand = parmService.getString(ParameterConstants.DBDIALECT_ORACLE_BULK_LOAD_SQLLDR_CMD);
int commitSize = parmService.getInt(ParameterConstants.DBDIALECT_ORACLE_BULK_LOAD_COMMIT_SIZE, 1000);
boolean useDirectPath = parmService.is(ParameterConstants.DBDIALECT_ORACLE_BULK_LOAD_DIRECT_PATH);
String ezConnectString = parmService.getString(ParameterConstants.DBDIALECT_ORACLE_BULK_LOAD_EZCONNECT);

return new OracleBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
engine.getStagingManager(), engine.getTablePrefix(), commitSize, useDirectPath, sqlLoaderCommand,
dbUser, dbPassword, dbUrl, ezConnectString,
buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData));
}

@@ -20,22 +20,26 @@
*/
package org.jumpmind.symmetric.io;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.sql.Types;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.ColumnTypes;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.util.BasicDataSourcePropertyConstants;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.security.SecurityConstants;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.csv.CsvWriter;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.CsvUtils;
@@ -59,33 +63,48 @@

protected boolean hasBinaryType;

protected int maxRowsBeforeFlush;
protected int commitSize;

protected String sqlLoader;
protected boolean useDirectPath;

protected String sqlLoaderCommand;

protected String dbUser;

protected String dbPassword;

protected String dbUrl;

protected String ezConnectString;

protected int rows = 0;

public OracleBulkDatabaseWriter(ISymmetricEngine engine, DatabaseWriterSettings settings) {
super(engine.getSymmetricDialect().getPlatform(), engine.getSymmetricDialect().getTargetPlatform(), engine.getTablePrefix(), settings);
stagingManager = engine.getStagingManager();
maxRowsBeforeFlush = engine.getParameterService().getInt("oracle.bulk.load.max.rows.before.flush", 100000);
sqlLoader = engine.getParameterService().getString("oracle.bulk.load.oracle.home", System.getenv("ORACLE_HOME"));
if (sqlLoader == null) {
sqlLoader = "";
public OracleBulkDatabaseWriter(IDatabasePlatform symmetricPlatform,
IDatabasePlatform targetPlatform, IStagingManager stagingManager, String tablePrefix,
int commitSize, boolean useDirectPath,
String sqlLoaderCommand, String dbUser, String dbPassword, String dbUrl, String ezConnectString,
DatabaseWriterSettings settings) {
super(symmetricPlatform, targetPlatform, tablePrefix, settings);
this.stagingManager = stagingManager;
this.commitSize = commitSize;
this.useDirectPath = useDirectPath;
this.sqlLoaderCommand = sqlLoaderCommand;
this.dbUser = dbUser;
this.dbPassword = dbPassword;
this.dbUrl = dbUrl;
this.ezConnectString = StringUtils.defaultIfBlank(ezConnectString, getEzConnectString(dbUrl));

if (StringUtils.isBlank(this.sqlLoaderCommand)) {
String oracleHome = System.getenv("ORACLE_HOME");
if (StringUtils.isNotBlank(oracleHome)) {
this.sqlLoaderCommand = oracleHome + File.separator + "bin" + File.separator + "sqlldr";
} else {
this.sqlLoaderCommand = "sqlldr";
}
}
sqlLoader += File.separator + "bin" + File.separator + "sqlldr";
dbUser = engine.getParameterService().getString(BasicDataSourcePropertyConstants.DB_POOL_USER);
if (dbUser != null && dbUser.startsWith(SecurityConstants.PREFIX_ENC)) {
dbUser = engine.getSecurityService().decrypt(dbUser.substring(SecurityConstants.PREFIX_ENC.length()));
}
dbPassword = engine.getParameterService().getString(BasicDataSourcePropertyConstants.DB_POOL_PASSWORD);
if (dbPassword != null && dbPassword.startsWith(SecurityConstants.PREFIX_ENC)) {
dbPassword = engine.getSecurityService().decrypt(dbPassword.substring(SecurityConstants.PREFIX_ENC.length()));
}
// TODO: options for readsize and bindsize?
// TODO: separate control file from data file for higher readsize?
// TODO: specify type and size for columns if CHAR(255) default is too small
}

public boolean start(Table table) {
@@ -127,7 +146,9 @@ protected void createStagingFile(Table table) {
if (type == Types.TIMESTAMP || type == Types.DATE) {
columns.append(" TIMESTAMP 'YYYY-MM-DD HH24:MI:SS.FF9'");
} else if (column.isTimestampWithTimezone()) {
columns.append(" TIMESTAMP 'YYYY-MM-DD HH24:MI:SS.FF9' TZH:TZM");
String scale = column.getScale() > 0 ? "(" + column.getScale() + ")" : "";
String local = column.getMappedTypeCode() == ColumnTypes.ORACLE_TIMESTAMPLTZ ? "LOCAL " : "";
columns.append(" TIMESTAMP" + scale + " WITH " + local + "TIME ZONE 'YYYY-MM-DD HH24:MI:SS.FF9 TZH:TZM'");
} else if (column.isOfBinaryType()) {
columns.append(" ENCLOSED BY '<sym_blob>' AND '</sym_blob>'");
}
@@ -210,22 +231,23 @@ protected void bulkWrite(CsvData data) {
writeDefault(data);
break;
}

if (rows >= maxRowsBeforeFlush) {
flush();
}
}

protected void flush() {
if (rows > 0) {
stagedInputFile.close();
statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
try {
// TODO: add options for direct=true rows=10000
String path = stagedInputFile.getFile().getParent();
String[] cmd = { sqlLoader, dbUser + "/" + dbPassword,
"control=" + stagedInputFile.getFile().getPath(), "silent=header" };
File absFile = stagedInputFile.getFile().getAbsoluteFile();
String path = absFile.getParent();
String[] cmd = { sqlLoaderCommand, dbUser + "/" + dbPassword + ezConnectString,
"control=" + stagedInputFile.getFile().getName(), "silent=header",
"direct=" + (useDirectPath ? "true" : "false") };
if (!useDirectPath) {
cmd = (String[]) ArrayUtils.add(cmd, "rows=" + commitSize);
}
if (logger.isDebugEnabled()) {
logger.debug("Working dir: {} ", path);
logger.debug("Running: {} ", ArrayUtils.toString(cmd));
}
ProcessBuilder pb = new ProcessBuilder(cmd);
@@ -234,6 +256,16 @@ protected void flush() {
Process process = null;
try {
process = pb.start();

BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line = null;
while ((line = reader.readLine()) != null) {
if (!line.equals("")) {
logger.info("SQL*Loader: {}", line);
}
}
reader.close();

int rc = process.waitFor();
if (rc != 0) {
throw new RuntimeException("Process builder returned " + rc);
@@ -243,8 +275,8 @@ protected void flush() {
}

stagedInputFile.delete();
new File(path.replace(".create", ".bad")).delete();
new File(path.replace(".create", ".log")).delete();
new File(absFile.getPath().replace(".create", ".bad")).delete();
new File(absFile.getPath().replace(".create", ".log")).delete();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
@@ -255,4 +287,39 @@ protected void flush() {
}
}

protected String getEzConnectString(String dbUrl) {
String ezConnect = null;
int index = dbUrl.indexOf("@//");
if (index != -1) {
ezConnect = dbUrl.substring(index);
} else {
index = dbUrl.toUpperCase().indexOf("HOST=");
if (index != -1) {
String database = StringUtils.defaultIfBlank(getTnsVariable(dbUrl, "SERVICE_NAME"),
getTnsVariable(dbUrl, "SID"));
ezConnect = "@//" + getTnsVariable(dbUrl, "HOST") + ":" + getTnsVariable(dbUrl, "PORT") + "/" + database;
} else {
index = dbUrl.indexOf("@");
if (index != -1) {
ezConnect = dbUrl.substring(index).replace("@", "@//");
index = ezConnect.lastIndexOf(":");
if (index != -1) {
ezConnect = ezConnect.substring(0, index) + "/" + ezConnect.substring(index + 1);
}
}
}
}
return ezConnect;
}

protected String getTnsVariable(String dbUrl, String name) {
String value = "";
int startIndex = dbUrl.toUpperCase().indexOf(name + "=");
if (startIndex != -1) {
int endIndex = dbUrl.indexOf(")", startIndex);
value = dbUrl.substring(startIndex + name.length() + 1, endIndex);
}
return value;
}

}
@@ -31,10 +31,15 @@

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.DbTestUtils;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.oracle.OracleDatabasePlatform;
import org.jumpmind.db.util.BasicDataSourcePropertyConstants;
import org.jumpmind.properties.EnvironmentSpecificProperties;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.io.stage.StagingManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -45,9 +50,10 @@
import oracle.sql.TIMESTAMPLTZ;
import oracle.sql.TIMESTAMPTZ;


public class OracleBulkDatabaseWriterTest extends AbstractWriterTest {

protected static IStagingManager stagingManager;

@BeforeClass
public static void setup() throws Exception {
if (DbTestUtils.getEnvironmentSpecificProperties(DbTestUtils.ROOT)
@@ -56,6 +62,7 @@ public static void setup() throws Exception {
platform = DbTestUtils.createDatabasePlatform(DbTestUtils.ROOT);
platform.createDatabase(
platform.readDatabaseFromXml("/testOracleBulkWriter.xml", true), true, false);
stagingManager = new StagingManager("tmp",false);
}
}

@@ -66,8 +73,11 @@ public void setupTest() {

@Override
protected long writeData(TableCsvData... datas) {
return writeData(new OracleBulkDatabaseWriter(platform, platform, "sym_", "sym",
new CommonsDbcpNativeJdbcExtractor(), 1000, null), datas);
EnvironmentSpecificProperties prop = DbTestUtils.getEnvironmentSpecificProperties(DbTestUtils.ROOT);
return writeData(new OracleBulkDatabaseWriter(platform, platform, stagingManager, "sym_",
1000, false, null, prop.get(BasicDataSourcePropertyConstants.DB_POOL_USER),
prop.get(BasicDataSourcePropertyConstants.DB_POOL_PASSWORD),
prop.get(BasicDataSourcePropertyConstants.DB_POOL_URL), null, null), datas);
}

@Override
@@ -123,7 +133,7 @@ public void testInsertTimestampTZ_timestamp() throws Exception {
Connection connection = datasource.getConnection();
Connection oracleConnection = jdbcExtractor.getNativeConnection(connection);

final String[] EXPECTED_TIMESTAMPTZ = {"2007-01-02 03:20:10.0 America/New_York","2007-01-02 03:20:10.0 US/Eastern"};
final String[] EXPECTED_TIMESTAMPTZ = {"2007-01-02 03:20:10.0 -5:00","2007-01-02 03:20:10.0 -4:00"};

checkTimestampTZ(rowData.get("TIMESTAMPTZ0_VALUE"), oracleConnection, EXPECTED_TIMESTAMPTZ);
checkTimestampTZ(rowData.get("TIMESTAMPTZ3_VALUE"), oracleConnection, EXPECTED_TIMESTAMPTZ);
@@ -170,44 +180,6 @@ public void testInsertTimestampTZ_timestampWithTimeZone() throws Exception {
}
}

@Test
public void testInsertTimestampTZ_timestampWithTimeZoneNull() throws Exception {
if (platform != null && platform instanceof OracleDatabasePlatform) {

NativeJdbcExtractor jdbcExtractor = new CommonsDbcpNativeJdbcExtractor();

platform.getSqlTemplate().update("truncate table test_bulkload_table_1");

List<CsvData> datas = new ArrayList<CsvData>();

String id = getNextId();

String[] values = { id, "string2", "string not null2", "char2",
"char not null2", "2007-01-02 03:20:10.000", "2007-02-03 04:05:06.000", "0",
"47", "67.89", "-0.0747663", "2007-01-02 03:20:10. -08:00",
"",
" ",
null };
CsvData data = new CsvData(DataEventType.INSERT, values);
datas.add(data);

long count = writeData(new TableCsvData(platform.getTableFromCache(
"test_bulkload_table_1", false), datas));

Map<String, Object> rowData = queryForRow(id);
DataSource datasource = (DataSource)platform.getDataSource();
Connection connection = datasource.getConnection();
Connection oracleConnection = jdbcExtractor.getNativeConnection(connection);

checkTimestampTZ(rowData.get("TIMESTAMPTZ0_VALUE"), oracleConnection, "2007-01-02 03:20:10.0 -8:00");
Assert.assertNull(rowData.get("TIMESTAMPTZ3_VALUE"));
Assert.assertNull(rowData.get("TIMESTAMPTZ6_VALUE"));
Assert.assertNull(rowData.get("TIMESTAMPTZ9_VALUE"));

Assert.assertEquals(count, countRows("test_bulkload_table_1"));
}
}

@Test
public void testInsertTimestampTZ_timestampWithLocalTimeZone() throws Exception {
if (platform != null && platform instanceof OracleDatabasePlatform) {
@@ -235,7 +207,7 @@ public void testInsertTimestampTZ_timestampWithLocalTimeZone() throws Exception
Connection connection = datasource.getConnection();
Connection oracleConnection = jdbcExtractor.getNativeConnection(connection);

checkTimestampLTZ(rowData.get("TIMESTAMPLTZ9_VALUE"), oracleConnection, new String[]{"2007-01-02 03:20:10.123456789 America/New_York","2007-01-02 03:20:10.123456789 US/Eastern"});
checkTimestampLTZ(rowData.get("TIMESTAMPLTZ9_VALUE"), oracleConnection, new String[]{"2007-01-02 06:20:10.123456789 America/New_York","2007-01-02 06:20:10.123456789 US/Eastern"});

Assert.assertEquals(count, countRows("test_bulkload_table_1"));
}

0 comments on commit ece9646

Please sign in to comment.
You can’t perform that action at this time.