Skip to content

Commit

Permalink
0003830: Tibero bulk loader using tbLoader
Browse files Browse the repository at this point in the history
  • Loading branch information
elong committed Dec 14, 2018
1 parent 09f40f7 commit 475beae
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 6 deletions.
Expand Up @@ -51,6 +51,9 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
} else if (DatabaseNamesConstants.ORACLE.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
return new OracleBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else if (DatabaseNamesConstants.TIBERO.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
return new TiberoBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else if (DatabaseNamesConstants.POSTGRESQL.equals(engine.getSymmetricDialect().getTargetPlatform().getName())
|| DatabaseNamesConstants.POSTGRESQL95.equals(engine.getSymmetricDialect().getTargetPlatform().getName())
|| DatabaseNamesConstants.GREENPLUM.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
Expand Down
@@ -0,0 +1,86 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.ext;

import java.util.List;

import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.util.BasicDataSourcePropertyConstants;
import org.jumpmind.security.SecurityConstants;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.OracleBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.load.DefaultDataLoaderFactory;
import org.jumpmind.symmetric.service.IParameterService;

public class TiberoBulkDataLoaderFactory extends DefaultDataLoaderFactory {

private ISymmetricEngine engine;

public TiberoBulkDataLoaderFactory(ISymmetricEngine engine) {
this.engine = engine;
this.parameterService = engine.getParameterService();
}

public String getTypeName() {
return "tibero_bulk";
}

public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetricDialect,
TransformWriter transformWriter, List<IDatabaseWriterFilter> filters,
List<IDatabaseWriterErrorHandler> errorHandlers,
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {

IParameterService parmService = engine.getParameterService();
String dbUrl = parmService.getString(BasicDataSourcePropertyConstants.DB_POOL_URL);
String dbUser = parmService.getString(BasicDataSourcePropertyConstants.DB_POOL_USER);
if (dbUser != null && dbUser.startsWith(SecurityConstants.PREFIX_ENC)) {
dbUser = engine.getSecurityService().decrypt(dbUser.substring(SecurityConstants.PREFIX_ENC.length()));
}

String dbPassword = parmService.getString(BasicDataSourcePropertyConstants.DB_POOL_PASSWORD);
if (dbPassword != null && dbPassword.startsWith(SecurityConstants.PREFIX_ENC)) {
dbPassword = engine.getSecurityService().decrypt(dbPassword.substring(SecurityConstants.PREFIX_ENC.length()));
}

String tbLoaderCommand = parmService.getString(ParameterConstants.DBDIALECT_TIBERO_BULK_LOAD_TBLOADER_CMD);
String tbLoaderOptions = parmService.getString(ParameterConstants.DBDIALECT_TIBERO_BULK_LOAD_TBLOADER_OPTIONS);
String ezConnectString = parmService.getString(ParameterConstants.DBDIALECT_TIBERO_BULK_LOAD_EZCONNECT);

return new OracleBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
engine.getStagingManager(), engine.getTablePrefix(), tbLoaderCommand, tbLoaderOptions,
dbUser, dbPassword, dbUrl, ezConnectString,
buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData));
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
return DatabaseNamesConstants.TIBERO.equals(platform.getName());
}

}
Expand Up @@ -93,7 +93,10 @@ public OracleBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePl
this.sqlLoaderOptions.add(option);
}
}

init();
}

protected void init() {
if (StringUtils.isBlank(this.sqlLoaderCommand)) {
String oracleHome = System.getenv("ORACLE_HOME");
if (StringUtils.isNotBlank(oracleHome)) {
Expand Down Expand Up @@ -133,11 +136,12 @@ protected void createStagingFile() {
try {
OutputStream out = controlResource.getOutputStream();
out.write(("LOAD DATA\n").getBytes());
out.write(("INFILE '" + dataResource.getFile().getName() + "' \"str '" + LINE_TERMINATOR + "'\"\n")
.getBytes());
out.write(getInfileControl().getBytes());
out.write(("APPEND INTO TABLE " + targetTable.getName() + "\n").getBytes());

out.write(("FIELDS TERMINATED BY '" + FIELD_TERMINATOR + "'\n").getBytes());
out.write(getLineTerminatedByControl().getBytes());

out.write("TRAILING NULLCOLS\n".getBytes());

StringBuilder columns = new StringBuilder("(");
Expand Down Expand Up @@ -169,6 +173,14 @@ protected void createStagingFile() {
}
}

protected String getInfileControl() {
return "INFILE '" + dataResource.getFile().getName() + "' \"str '" + LINE_TERMINATOR + "'\"\n";
}

protected String getLineTerminatedByControl() {
return "";
}

@Override
public void end(Table table) {
try {
Expand Down Expand Up @@ -235,7 +247,7 @@ protected void flush() {
File parentDir = controlResource.getFile().getParentFile();
ArrayList<String> cmd = new ArrayList<String>();
cmd.add(sqlLoaderCommand);
cmd.add(dbUser + "/" + dbPassword + ezConnectString);
cmd.add("userid=" + dbUser + "/" + dbPassword + ezConnectString);
cmd.add("control=" + controlResource.getFile().getName());
cmd.addAll(sqlLoaderOptions);
if (logger.isDebugEnabled()) {
Expand All @@ -250,7 +262,7 @@ protected void flush() {
String line = null;
while ((line = reader.readLine()) != null) {
if (!line.equals("")) {
logger.info("SQL*Loader: {}", line);
logger.info("{}: {}", getLoaderName(), line);
}
}

Expand Down Expand Up @@ -330,5 +342,9 @@ protected String getTnsVariable(String dbUrl, String name) {
}
return value;
}

protected String getLoaderName() {
return "SQL*Loader";
}

}
@@ -0,0 +1,69 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.io;

import java.io.File;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TiberoBulkDatabaseWriter extends OracleBulkDatabaseWriter {

protected final Logger logger = LoggerFactory.getLogger(getClass());

public TiberoBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform,
IStagingManager stagingManager, String tablePrefix, String tbLoaderCommand, String tbLoaderOptions,
String dbUser, String dbPassword, String dbUrl, String ezConnectString, DatabaseWriterSettings settings) {
super(symmetricPlatform, targetPlatform, stagingManager, tablePrefix, tbLoaderCommand, tbLoaderOptions,
dbUser, dbPassword, dbUrl, ezConnectString, settings);
}

@Override
protected void init() {
if (StringUtils.isBlank(this.sqlLoaderCommand)) {
String oracleHome = System.getenv("TB_HOME");
if (StringUtils.isNotBlank(oracleHome)) {
this.sqlLoaderCommand = oracleHome + File.separator + "bin" + File.separator + "tbldr";
} else {
this.sqlLoaderCommand = "tbldr";
}
}
}

@Override
protected String getInfileControl() {
return "INFILE '" + dataResource.getFile().getName() + "'\n";
}

@Override
protected String getLineTerminatedByControl() {
return "LINES TERMINATED BY '" + LINE_TERMINATOR + "'";
}

protected String getLoaderName() {
return "TBLoader";
}

}
Expand Up @@ -239,7 +239,10 @@ private ParameterConstants() {
public final static String DBDIALECT_TIBERO_USE_TRANSACTION_VIEW = "tibero.use.transaction.view";
public final static String DBDIALECT_TIBERO_TEMPLATE_NUMBER_SPEC = "tibero.template.precision";
public final static String DBDIALECT_TIBERO_USE_HINTS = "tibero.use.hints";

public final static String DBDIALECT_TIBERO_BULK_LOAD_TBLOADER_CMD = "tibero.bulk.load.tbloader.cmd";
public final static String DBDIALECT_TIBERO_BULK_LOAD_TBLOADER_OPTIONS = "tibero.bulk.load.tbloader.options";
public final static String DBDIALECT_TIBERO_BULK_LOAD_EZCONNECT = "tibero.bulk.load.ezconnect";

public final static String DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS = "oracle.transaction.view.clock.sync.threshold.ms";

public final static String DATA_ID_INCREMENT_BY = "data.id.increment.by";
Expand Down
22 changes: 22 additions & 0 deletions symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -1611,6 +1611,28 @@ oracle.bulk.load.sqlldr.options=silent=(header,discards) direct=false readsize=4
# DatabaseOverridable: false
oracle.bulk.load.ezconnect=

# Path to the tbloader executable for running Tibero tbLoader.
# If blank, it will check for TB_HOME environment variable and find tbLoader there.
# Otherwise, it will run "tbloader" and expect the operating system to find it.
#
# Tags: other
# DatabaseOverridable: false
tibero.bulk.load.tbloader.cmd=

# Options passed to Tibero tbLoader
#
# Tags: other
# Type: boolean
# DatabaseOverridable: false
tibero.bulk.load.tbloader.options=direct=N parallel=1 disable_idx=N readsize=2097152 bindsize=2097152 rows=2000 errors=0

# For bulk loading with SQL*Loader, specify how to connect to the database with an ezconnect name.
# If blank, the connection is determined using the db.url parameter.
#
# Tags: other
# DatabaseOverridable: false
tibero.bulk.load.ezconnect=

# Use to map the version string a zseries jdbc driver returns to the 'zseries' dialect
# Tags: other
db2.zseries.version=DSN08015
Expand Down

0 comments on commit 475beae

Please sign in to comment.