Skip to content

Commit

Permalink
0003825: Oracle bulk loader using SQL*Loader sqlldr
Browse files Browse the repository at this point in the history
  • Loading branch information
elong committed Dec 12, 2018
1 parent 5f95292 commit f81ab6d
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 78 deletions.
Expand Up @@ -28,6 +28,7 @@
import org.jumpmind.security.SecurityConstants;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.ServerConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.OracleBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter;
Expand Down Expand Up @@ -70,13 +71,14 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
}

String sqlLoaderCommand = parmService.getString(ParameterConstants.DBDIALECT_ORACLE_BULK_LOAD_SQLLDR_CMD);
int commitSize = parmService.getInt(ParameterConstants.DBDIALECT_ORACLE_BULK_LOAD_COMMIT_SIZE, 1000);
boolean useDirectPath = parmService.is(ParameterConstants.DBDIALECT_ORACLE_BULK_LOAD_DIRECT_PATH);
String sqlLoaderOptions = parmService.getString(ParameterConstants.DBDIALECT_ORACLE_BULK_LOAD_SQLLDR_OPTIONS);
String ezConnectString = parmService.getString(ParameterConstants.DBDIALECT_ORACLE_BULK_LOAD_EZCONNECT);
boolean isStagingClearText = !parmService.is(ServerConstants.STREAM_TO_FILE_ENCRYPT_ENABLED, false) &&
!parmService.is(ServerConstants.STREAM_TO_FILE_COMPRESSION_ENABLED, false);

return new OracleBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
engine.getStagingManager(), engine.getTablePrefix(), commitSize, useDirectPath, sqlLoaderCommand,
dbUser, dbPassword, dbUrl, ezConnectString,
engine.getStagingManager(), engine.getTablePrefix(), sqlLoaderCommand, sqlLoaderOptions,
dbUser, dbPassword, dbUrl, ezConnectString, isStagingClearText,
buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData));
}

Expand Down
Expand Up @@ -28,18 +28,17 @@
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.sql.Types;
import java.util.ArrayList;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.ColumnTypes;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.util.BasicDataSourcePropertyConstants;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.csv.CsvWriter;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.CsvUtils;
Expand All @@ -57,18 +56,20 @@ public class OracleBulkDatabaseWriter extends AbstractBulkDatabaseWriter {

protected IStagingManager stagingManager;

protected IStagedResource stagedInputFile;
protected IStagedResource dataResource;

protected IStagedResource controlResource;

protected Table table = null;

protected boolean hasBinaryType;

protected int commitSize;

protected boolean useDirectPath;
protected boolean useIncomingStageFile;

protected String sqlLoaderCommand;

protected ArrayList<String> sqlLoaderOptions;

protected String dbUser;

protected String dbPassword;
Expand All @@ -77,35 +78,39 @@ public class OracleBulkDatabaseWriter extends AbstractBulkDatabaseWriter {

protected String ezConnectString;

protected boolean isStagingClearText;

protected int rows = 0;

public OracleBulkDatabaseWriter(IDatabasePlatform symmetricPlatform,
IDatabasePlatform targetPlatform, IStagingManager stagingManager, String tablePrefix,
int commitSize, boolean useDirectPath,
String sqlLoaderCommand, String dbUser, String dbPassword, String dbUrl, String ezConnectString,
DatabaseWriterSettings settings) {
super(symmetricPlatform, targetPlatform, tablePrefix, settings);
this.stagingManager = stagingManager;
this.commitSize = commitSize;
this.useDirectPath = useDirectPath;
this.sqlLoaderCommand = sqlLoaderCommand;
this.dbUser = dbUser;
public OracleBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform,
IStagingManager stagingManager, String tablePrefix, String sqlLoaderCommand, String sqlLoaderOptions,
String dbUser, String dbPassword, String dbUrl, String ezConnectString, boolean isStagingClearText,
DatabaseWriterSettings settings) {
super(symmetricPlatform, targetPlatform, tablePrefix, settings);
this.stagingManager = stagingManager;
this.sqlLoaderCommand = sqlLoaderCommand;
this.dbUser = dbUser;
this.dbPassword = dbPassword;
this.dbUrl = dbUrl;
this.ezConnectString = StringUtils.defaultIfBlank(ezConnectString, getEzConnectString(dbUrl));
this.isStagingClearText = isStagingClearText;

if (StringUtils.isBlank(this.sqlLoaderCommand)) {
String oracleHome = System.getenv("ORACLE_HOME");
if (StringUtils.isNotBlank(oracleHome)) {
this.sqlLoaderCommand = oracleHome + File.separator + "bin" + File.separator + "sqlldr";
} else {
this.sqlLoaderCommand = "sqlldr";
}
}
// TODO: options for readsize and bindsize?
// TODO: separate control file from data file for higher readsize?
// TODO: specify type and size for columns if CHAR(255) default is too small
}
this.sqlLoaderOptions = new ArrayList<String>();
if (StringUtils.isNotBlank(sqlLoaderOptions)) {
for (String option : sqlLoaderOptions.split(" ")) {
this.sqlLoaderOptions.add(option);
}
}

if (StringUtils.isBlank(this.sqlLoaderCommand)) {
String oracleHome = System.getenv("ORACLE_HOME");
if (StringUtils.isNotBlank(oracleHome)) {
this.sqlLoaderCommand = oracleHome + File.separator + "bin" + File.separator + "sqlldr";
} else {
this.sqlLoaderCommand = "sqlldr";
}
}
}

public boolean start(Table table) {
this.table = table;
Expand All @@ -119,31 +124,59 @@ public boolean start(Table table) {
}
}
}
if (stagedInputFile == null) {
createStagingFile(table);
if (dataResource == null) {
createStagingFile();
}
return true;
} else {
return false;
}
}

protected void createStagingFile(Table table) {
stagedInputFile = stagingManager.create("bulkloaddir", getBatch().getBatchId());
protected void createStagingFile() {
long batchId = getBatch().getBatchId();
controlResource = stagingManager.create("bulkloaddir", StringUtils.leftPad(batchId + "-ctl", 14, "0"));
String infile = null;
useIncomingStageFile = false;

if (isStagingClearText && !hasBinaryType) {
dataResource = stagingManager.find(Constants.STAGING_CATEGORY_INCOMING, batch.getStagedLocation(), batchId);
if (dataResource != null) {
useIncomingStageFile = true;
infile = dataResource.getFile().getAbsolutePath();
}
}
if (!useIncomingStageFile) {
dataResource = stagingManager.create("bulkloaddir", batchId);
infile = dataResource.getFile().getName();
}

try {
OutputStream out = stagedInputFile.getOutputStream();
out.write(("LOAD DATA\nINFILE *\nINSERT INTO TABLE " + table.getName() + "\n").getBytes());
OutputStream out = controlResource.getOutputStream();
out.write(("LOAD DATA\nINFILE '" + infile + "'\nAPPEND INTO TABLE " + targetTable.getName() + "\n").getBytes());

if (useIncomingStageFile) {
out.write("WHEN (01:06 = 'insert')\n".getBytes());
}
out.write("FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"'\nTRAILING NULLCOLS\n".getBytes());

StringBuilder columns = new StringBuilder("(");
int index = 0;
for (Column column : table.getColumns()) {
if (useIncomingStageFile) {
columns.append("EVENT FILLER");
index++;
}
for (Column column : targetTable.getColumns()) {
if (index++ > 0) {
columns.append(", ");
}
columns.append(column.getName());
int type = column.getJdbcTypeCode();
if (type == Types.TIMESTAMP || type == Types.DATE) {
int type = column.getMappedTypeCode();
if (type == Types.CLOB || type == Types.NCLOB) {
columns.append(" CLOB");
} else if (column.isOfTextType() && column.getSizeAsInt() > 0) {
columns.append(" CHAR(" + column.getSize() + ")");
} else if (type == Types.TIMESTAMP || type == Types.DATE) {
columns.append(" TIMESTAMP 'YYYY-MM-DD HH24:MI:SS.FF9'");
} else if (column.isTimestampWithTimezone()) {
String scale = column.getScale() > 0 ? "(" + column.getScale() + ")" : "";
Expand All @@ -154,9 +187,8 @@ protected void createStagingFile(Table table) {
}
}
columns.append(")\n");

out.write(columns.toString().getBytes());
out.write("BEGINDATA\n".getBytes());
controlResource.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -172,6 +204,11 @@ public void end(Table table) {
}

protected void bulkWrite(CsvData data) {
if (useIncomingStageFile) {
rows++;
// TODO: throw an exception that causes reading of incoming stage file to be skipped
return;
}
DataEventType dataEventType = data.getDataEventType();

switch (dataEventType) {
Expand Down Expand Up @@ -213,7 +250,7 @@ protected void bulkWrite(CsvData data) {
String formattedData = CsvUtils.escapeCsvData(parsedData, '\n', '"');
byteData = formattedData.getBytes();
}
stagedInputFile.getOutputStream().write(byteData);
dataResource.getOutputStream().write(byteData);
rows++;
} catch (Exception ex) {
throw getPlatform().getSqlTemplate().translate(ex);
Expand All @@ -235,23 +272,21 @@ protected void bulkWrite(CsvData data) {

protected void flush() {
if (rows > 0) {
stagedInputFile.close();
dataResource.close();
statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
try {
File absFile = stagedInputFile.getFile().getAbsoluteFile();
String path = absFile.getParent();
String[] cmd = { sqlLoaderCommand, dbUser + "/" + dbPassword + ezConnectString,
"control=" + stagedInputFile.getFile().getName(), "silent=header",
"direct=" + (useDirectPath ? "true" : "false") };
if (!useDirectPath) {
cmd = (String[]) ArrayUtils.add(cmd, "rows=" + commitSize);
}
File parentDir = controlResource.getFile().getParentFile();
ArrayList<String> cmd = new ArrayList<String>();
cmd.add(sqlLoaderCommand);
cmd.add(dbUser + "/" + dbPassword + ezConnectString);
cmd.add("control=" + controlResource.getFile().getName());
cmd.addAll(sqlLoaderOptions);
if (logger.isDebugEnabled()) {
logger.debug("Working dir: {} ", path);
logger.debug("Running: {} ", ArrayUtils.toString(cmd));
logger.debug("Working dir: {} ", parentDir.getAbsolutePath());
logger.debug("Running: {} ", cmd.toString());
}
ProcessBuilder pb = new ProcessBuilder(cmd);
pb.directory(new File(path));
pb.directory(parentDir);
pb.redirectErrorStream(true);
Process process = null;
try {
Expand All @@ -267,21 +302,29 @@ protected void flush() {
reader.close();

int rc = process.waitFor();
if (rc != 0) {
if (rc == 2) {
if (!useIncomingStageFile) {
throw new RuntimeException("All or some rows were rejected.");
}
} else if (rc != 0) {
throw new RuntimeException("Process builder returned " + rc);
}
} catch (IOException e) {
throw new RuntimeException(e);
}

stagedInputFile.delete();
if (!useIncomingStageFile) {
dataResource.delete();
}
File absFile = controlResource.getFile().getAbsoluteFile();
new File(absFile.getPath().replace(".create", ".bad")).delete();
new File(absFile.getPath().replace(".create", ".log")).delete();
controlResource.delete();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS);
stagedInputFile = null;
dataResource = null;
rows = 0;
}
}
Expand Down
Expand Up @@ -31,13 +31,11 @@

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.DbTestUtils;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.oracle.OracleDatabasePlatform;
import org.jumpmind.db.util.BasicDataSourcePropertyConstants;
import org.jumpmind.properties.EnvironmentSpecificProperties;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.io.stage.StagingManager;
import org.junit.Assert;
Expand Down Expand Up @@ -75,9 +73,10 @@ public void setupTest() {
protected long writeData(TableCsvData... datas) {
EnvironmentSpecificProperties prop = DbTestUtils.getEnvironmentSpecificProperties(DbTestUtils.ROOT);
return writeData(new OracleBulkDatabaseWriter(platform, platform, stagingManager, "sym_",
1000, false, null, prop.get(BasicDataSourcePropertyConstants.DB_POOL_USER),
null, "silent=(header,discards) direct=false readsize=4096000 bindsize=4096000 rows=1000 discardmax=1 errors=0",
prop.get(BasicDataSourcePropertyConstants.DB_POOL_USER),
prop.get(BasicDataSourcePropertyConstants.DB_POOL_PASSWORD),
prop.get(BasicDataSourcePropertyConstants.DB_POOL_URL), null, null), datas);
prop.get(BasicDataSourcePropertyConstants.DB_POOL_URL), null, false, null), datas);
}

@Override
Expand Down
Expand Up @@ -233,8 +233,7 @@ private ParameterConstants() {
public final static String DBDIALECT_ORACLE_SEQUENCE_NOORDER = "oracle.sequence.noorder";
public final static String DBDIALECT_ORACLE_SEQUENCE_NOORDER_NEXTVALUE_DB_URLS = "oracle.sequence.noorder.nextvalue.db.urls";
public final static String DBDIALECT_ORACLE_BULK_LOAD_SQLLDR_CMD = "oracle.bulk.load.sqlldr.cmd";
public final static String DBDIALECT_ORACLE_BULK_LOAD_COMMIT_SIZE = "oracle.bulk.load.commit.size";
public final static String DBDIALECT_ORACLE_BULK_LOAD_DIRECT_PATH = "oracle.bulk.load.direct.path";
public final static String DBDIALECT_ORACLE_BULK_LOAD_SQLLDR_OPTIONS = "oracle.bulk.load.sqlldr.options";
public final static String DBDIALECT_ORACLE_BULK_LOAD_EZCONNECT = "oracle.bulk.load.ezconnect";

public final static String DBDIALECT_TIBERO_USE_TRANSACTION_VIEW = "tibero.use.transaction.view";
Expand Down
16 changes: 5 additions & 11 deletions symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -1597,25 +1597,19 @@ oracle.sequence.noorder.nextvalue.db.urls=
# DatabaseOverridable: false
oracle.bulk.load.sqlldr.cmd=

# For bulk loading with SQL*Loader, specify how to connect to the database with an ezconnect name.
# If blank, the connection is determined using the db.url parameter.
#
# Tags: other
# DatabaseOverridable: false
oracle.bulk.load.ezconnect=

# Enable direct path loading with SQL*Loader, which is much faster than conventional path.
# Options passed to Oracle SQL*Loader.
#
# Tags: other
# Type: boolean
# DatabaseOverridable: false
oracle.bulk.load.direct.path=false
oracle.bulk.load.sqlldr.options=silent=(header,discards) direct=false readsize=4096000 bindsize=4096000 rows=2000 errors=0

# Specify the commit size for SQL*Loader when using conventional path bulk loading.
# For bulk loading with SQL*Loader, specify how to connect to the database with an ezconnect name.
# If blank, the connection is determined using the db.url parameter.
#
# Tags: other
# DatabaseOverridable: false
oracle.bulk.load.commit.size=10000
oracle.bulk.load.ezconnect=

# Use to map the version string a zseries jdbc driver returns to the 'zseries' dialect
# Tags: other
Expand Down

0 comments on commit f81ab6d

Please sign in to comment.