Skip to content

Commit

Permalink
0003830: Tibero bulk loader using tbLoader
Browse files Browse the repository at this point in the history
  • Loading branch information
elong committed Dec 17, 2018
1 parent a1e670d commit d5d0f58
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 22 deletions.
Expand Up @@ -29,7 +29,7 @@
import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.OracleBulkDatabaseWriter; import org.jumpmind.symmetric.io.TiberoBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter; import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict; import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler; import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
Expand Down Expand Up @@ -71,11 +71,11 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri


String tbLoaderCommand = parmService.getString(ParameterConstants.DBDIALECT_TIBERO_BULK_LOAD_TBLOADER_CMD); String tbLoaderCommand = parmService.getString(ParameterConstants.DBDIALECT_TIBERO_BULK_LOAD_TBLOADER_CMD);
String tbLoaderOptions = parmService.getString(ParameterConstants.DBDIALECT_TIBERO_BULK_LOAD_TBLOADER_OPTIONS); String tbLoaderOptions = parmService.getString(ParameterConstants.DBDIALECT_TIBERO_BULK_LOAD_TBLOADER_OPTIONS);
String ezConnectString = parmService.getString(ParameterConstants.DBDIALECT_TIBERO_BULK_LOAD_EZCONNECT); String dbName = parmService.getString(ParameterConstants.DBDIALECT_TIBERO_BULK_LOAD_DBNAME);


return new OracleBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(), return new TiberoBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
engine.getStagingManager(), engine.getTablePrefix(), tbLoaderCommand, tbLoaderOptions, engine.getStagingManager(), engine.getTablePrefix(), tbLoaderCommand, tbLoaderOptions,
dbUser, dbPassword, dbUrl, ezConnectString, dbUser, dbPassword, dbUrl, dbName,
buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData)); buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData));
} }


Expand Down
Expand Up @@ -50,7 +50,7 @@ public class OracleBulkDatabaseWriter extends AbstractBulkDatabaseWriter {


protected final static String LINE_TERMINATOR = "|>"; protected final static String LINE_TERMINATOR = "|>";


protected final Logger logger = LoggerFactory.getLogger(getClass()); protected Logger logger;


protected IStagingManager stagingManager; protected IStagingManager stagingManager;


Expand Down Expand Up @@ -80,12 +80,13 @@ public OracleBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePl
IStagingManager stagingManager, String tablePrefix, String sqlLoaderCommand, String sqlLoaderOptions, IStagingManager stagingManager, String tablePrefix, String sqlLoaderCommand, String sqlLoaderOptions,
String dbUser, String dbPassword, String dbUrl, String ezConnectString, DatabaseWriterSettings settings) { String dbUser, String dbPassword, String dbUrl, String ezConnectString, DatabaseWriterSettings settings) {
super(symmetricPlatform, targetPlatform, tablePrefix, settings); super(symmetricPlatform, targetPlatform, tablePrefix, settings);
logger = LoggerFactory.getLogger(getClass());
this.stagingManager = stagingManager; this.stagingManager = stagingManager;
this.sqlLoaderCommand = sqlLoaderCommand; this.sqlLoaderCommand = sqlLoaderCommand;
this.dbUser = dbUser; this.dbUser = dbUser;
this.dbPassword = dbPassword; this.dbPassword = dbPassword;
this.dbUrl = dbUrl; this.dbUrl = dbUrl;
this.ezConnectString = StringUtils.defaultIfBlank(ezConnectString, getEzConnectString(dbUrl)); this.ezConnectString = StringUtils.defaultIfBlank(ezConnectString, getConnectString(dbUrl));


this.sqlLoaderOptions = new ArrayList<String>(); this.sqlLoaderOptions = new ArrayList<String>();
if (StringUtils.isNotBlank(sqlLoaderOptions)) { if (StringUtils.isNotBlank(sqlLoaderOptions)) {
Expand Down Expand Up @@ -307,7 +308,7 @@ protected void cleanup(boolean inError) {
} }
} }


protected String getEzConnectString(String dbUrl) { protected String getConnectString(String dbUrl) {
String ezConnect = null; String ezConnect = null;
int index = dbUrl.indexOf("@//"); int index = dbUrl.indexOf("@//");
if (index != -1) { if (index != -1) {
Expand Down
Expand Up @@ -26,28 +26,26 @@
import org.jumpmind.db.platform.IDatabasePlatform; import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings; import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.stage.IStagingManager; import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


public class TiberoBulkDatabaseWriter extends OracleBulkDatabaseWriter { public class TiberoBulkDatabaseWriter extends OracleBulkDatabaseWriter {


protected final Logger logger = LoggerFactory.getLogger(getClass());

public TiberoBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform, public TiberoBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform,
IStagingManager stagingManager, String tablePrefix, String tbLoaderCommand, String tbLoaderOptions, IStagingManager stagingManager, String tablePrefix, String tbLoaderCommand, String tbLoaderOptions,
String dbUser, String dbPassword, String dbUrl, String ezConnectString, DatabaseWriterSettings settings) { String dbUser, String dbPassword, String dbUrl, String dbName, DatabaseWriterSettings settings) {
super(symmetricPlatform, targetPlatform, stagingManager, tablePrefix, tbLoaderCommand, tbLoaderOptions, super(symmetricPlatform, targetPlatform, stagingManager, tablePrefix, tbLoaderCommand, tbLoaderOptions,
dbUser, dbPassword, dbUrl, ezConnectString, settings); dbUser, dbPassword, dbUrl, dbName, settings);
logger = LoggerFactory.getLogger(getClass());
} }


@Override @Override
protected void init() { protected void init() {
if (StringUtils.isBlank(this.sqlLoaderCommand)) { if (StringUtils.isBlank(this.sqlLoaderCommand)) {
String oracleHome = System.getenv("TB_HOME"); String tiberoHome = System.getenv("TB_HOME");
if (StringUtils.isNotBlank(oracleHome)) { if (StringUtils.isNotBlank(tiberoHome)) {
this.sqlLoaderCommand = oracleHome + File.separator + "bin" + File.separator + "tbldr"; this.sqlLoaderCommand = tiberoHome + File.separator + "client" + File.separator + "bin" + File.separator + "tbloader";
} else { } else {
this.sqlLoaderCommand = "tbldr"; this.sqlLoaderCommand = "tbloader";
} }
} }
} }
Expand All @@ -59,11 +57,22 @@ protected String getInfileControl() {


@Override @Override
protected String getLineTerminatedByControl() { protected String getLineTerminatedByControl() {
return "LINES TERMINATED BY '" + LINE_TERMINATOR + "'"; return "LINES TERMINATED BY '" + LINE_TERMINATOR + "'\n";
} }


@Override
protected String getLoaderName() { protected String getLoaderName() {
return "TBLoader"; return "TBLoader";
} }


@Override
protected String getConnectString(String dbUrl) {
String connectStr = "";
int index = dbUrl.lastIndexOf(":");
if (index != -1) {
connectStr = "@" + dbUrl.substring(index + 1);
}
return connectStr;
}

} }
Expand Up @@ -241,7 +241,7 @@ private ParameterConstants() {
public final static String DBDIALECT_TIBERO_USE_HINTS = "tibero.use.hints"; public final static String DBDIALECT_TIBERO_USE_HINTS = "tibero.use.hints";
public final static String DBDIALECT_TIBERO_BULK_LOAD_TBLOADER_CMD = "tibero.bulk.load.tbloader.cmd"; public final static String DBDIALECT_TIBERO_BULK_LOAD_TBLOADER_CMD = "tibero.bulk.load.tbloader.cmd";
public final static String DBDIALECT_TIBERO_BULK_LOAD_TBLOADER_OPTIONS = "tibero.bulk.load.tbloader.options"; public final static String DBDIALECT_TIBERO_BULK_LOAD_TBLOADER_OPTIONS = "tibero.bulk.load.tbloader.options";
public final static String DBDIALECT_TIBERO_BULK_LOAD_EZCONNECT = "tibero.bulk.load.ezconnect"; public final static String DBDIALECT_TIBERO_BULK_LOAD_DBNAME = "tibero.bulk.load.dbname";


public final static String DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS = "oracle.transaction.view.clock.sync.threshold.ms"; public final static String DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS = "oracle.transaction.view.clock.sync.threshold.ms";


Expand Down
Expand Up @@ -1624,14 +1624,14 @@ tibero.bulk.load.tbloader.cmd=
# Tags: other # Tags: other
# Type: boolean # Type: boolean
# DatabaseOverridable: false # DatabaseOverridable: false
tibero.bulk.load.tbloader.options=direct=N parallel=1 disable_idx=N readsize=2097152 bindsize=2097152 rows=2000 errors=0 tibero.bulk.load.tbloader.options=direct=N dpl_parallel=1 disable_idx=N readsize=2097152 bindsize=2097152 rows=2000 errors=0


# For bulk loading with SQL*Loader, specify how to connect to the database with an ezconnect name. # For bulk loading with tbLoader, specify the database name.
# If blank, the connection is determined using the db.url parameter. # If blank, the database name is determined using the db.url parameter.
# #
# Tags: other # Tags: other
# DatabaseOverridable: false # DatabaseOverridable: false
tibero.bulk.load.ezconnect= tibero.bulk.load.dname=


# Use to map the version string a zseries jdbc driver returns to the 'zseries' dialect # Use to map the version string a zseries jdbc driver returns to the 'zseries' dialect
# Tags: other # Tags: other
Expand Down

0 comments on commit d5d0f58

Please sign in to comment.