Skip to content

Commit

Permalink
0003258: DB2 support for transaction id
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Oct 3, 2017
1 parent 2503c6d commit 7687ff7
Showing 1 changed file with 56 additions and 55 deletions.
Expand Up @@ -34,13 +34,13 @@
*/
public class Db2SymmetricDialect extends AbstractSymmetricDialect implements ISymmetricDialect {

// DB2 Variables
public static final String VAR_SOURCE_NODE_ID = "_source_node_id";
public static final String VAR_TRIGGER_DISABLED = "_trigger_disabled";
public static final String FUNCTION_TRANSACTION_ID = "_transactionid";
static final String SQL_DROP_FUNCTION = "DROP FUNCTION $(functionName)";
// DB2 Variables
public static final String VAR_SOURCE_NODE_ID = "_source_node_id";
public static final String VAR_TRIGGER_DISABLED = "_trigger_disabled";

public static final String FUNCTION_TRANSACTION_ID = "_transactionid";
static final String SQL_DROP_FUNCTION = "DROP FUNCTION $(functionName)";

public Db2SymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) {
super(parameterService, platform);
this.triggerTemplate = new Db2TriggerTemplate(this);
Expand All @@ -50,79 +50,82 @@ public boolean createOrAlterTablesIfNecessary(String... tables) {
boolean tablesCreated = super.createOrAlterTablesIfNecessary(tables);
if (tablesCreated) {
log.info("Resetting auto increment columns for {}", parameterService.getTablePrefix() + "_data");
long dataId = platform.getSqlTemplate().queryForLong("select max(data_id) from " + parameterService.getTablePrefix()
+ "_data") + 1;
platform.getSqlTemplate().update("alter table " + parameterService.getTablePrefix()
+ "_data alter column data_id restart with " + dataId);
long dataId = platform.getSqlTemplate().queryForLong("select max(data_id) from " + parameterService.getTablePrefix() + "_data")
+ 1;
platform.getSqlTemplate()
.update("alter table " + parameterService.getTablePrefix() + "_data alter column data_id restart with " + dataId);
}
return tablesCreated;
}

@Override
protected boolean doesTriggerExistOnPlatform(String catalog, String schema, String tableName,
String triggerName) {
schema = schema == null ? (platform.getDefaultSchema() == null ? null : platform
.getDefaultSchema()) : schema;
protected boolean doesTriggerExistOnPlatform(String catalog, String schema, String tableName, String triggerName) {
schema = schema == null ? (platform.getDefaultSchema() == null ? null : platform.getDefaultSchema()) : schema;
return platform.getSqlTemplate().queryForInt(
"SELECT COUNT(*) FROM " + getSystemSchemaName() + ".SYSTRIGGERS WHERE NAME = ? AND SCHEMA = ?",
new Object[] { triggerName.toUpperCase(), schema.toUpperCase() }) > 0;
}

@Override
public String massageDataExtractionSql(String sql, Channel channel) {
/* Remove tranaction_id from the sql because DB2 doesn't support transactions. In fact,
* DB2 iSeries does return results because the query asks for every column in the table PLUS
* the router_id. We max out the size of the table on iSeries so when you try to return the
/*
* Remove tranaction_id from the sql because DB2 doesn't support
* transactions. In fact, DB2 iSeries does return results because the
* query asks for every column in the table PLUS the router_id. We max
* out the size of the table on iSeries so when you try to return the
* entire table + additional columns we go past the max size for a row
*/
sql = sql.replace("d.transaction_id, ", "");
return super.massageDataExtractionSql(sql, channel);
}

protected String getSystemSchemaName() {
return "SYSIBM";
return "SYSIBM";
}

@Override
public void createRequiredDatabaseObjects() {
public void createRequiredDatabaseObjects() {
String sql = "select " + getSourceNodeExpression() + " from " + parameterService.getTablePrefix() + "_node_identity";
try {
platform.getSqlTemplate().query(sql);
try {
platform.getSqlTemplate().query(sql);
} catch (Exception e) {
log.debug("Failed checking for variable (usually means it doesn't exist yet) '" + sql + "'", e);
platform.getSqlTemplate().update("create variable " + getSourceNodeExpression() + " varchar(50)");
}
catch (Exception e) {
log.debug("Failed checking for variable (usually means it doesn't exist yet) '" + sql + "'", e);
platform.getSqlTemplate().update("create variable " + getSourceNodeExpression() + " varchar(50)");
}
sql = "select " + parameterService.getTablePrefix() + VAR_TRIGGER_DISABLED + " from " + parameterService.getTablePrefix() + "_node_identity";
try {
platform.getSqlTemplate().query(sql);
sql = "select " + parameterService.getTablePrefix() + VAR_TRIGGER_DISABLED + " from " + parameterService.getTablePrefix()
+ "_node_identity";
try {
platform.getSqlTemplate().query(sql);
} catch (Exception e) {
log.debug("Failed checking for variable (usually means it doesn't exist yet) '" + sql + "'", e);
platform.getSqlTemplate().update("create variable " + parameterService.getTablePrefix() + VAR_TRIGGER_DISABLED + " varchar(50)");
}
catch (Exception e) {
log.debug("Failed checking for variable (usually means it doesn't exist yet) '" + sql + "'", e);
platform.getSqlTemplate().update("create variable " + parameterService.getTablePrefix() + VAR_TRIGGER_DISABLED + " varchar(50)");
}

String transactionIdFunction = this.parameterService.getTablePrefix() + FUNCTION_TRANSACTION_ID;

sql = "CREATE OR REPLACE FUNCTION $(functionName)() "
+ " RETURNS VARCHAR(100) "
+ " LANGUAGE SQL "
+ " READS SQL DATA "
+ " RETURN "
+ " select c.application_id || '_' || u.uow_id "
+ " from sysibmadm.mon_connection_summary c ,sysibmadm.mon_current_uow u "
+ " where u.application_handle = c.application_handle and c.application_id = application_id() ";


String transactionIdFunction = this.parameterService.getTablePrefix() + FUNCTION_TRANSACTION_ID;

sql = "CREATE OR REPLACE FUNCTION $(functionName)() "
+ " RETURNS VARCHAR(100) "
+ " LANGUAGE SQL "
+ " READS SQL DATA "
+ " RETURN "
+ " select c.application_id || '_' || u.uow_id "
+ " from sysibmadm.mon_connection_summary c ,sysibmadm.mon_current_uow u "
+ " where u.application_handle = c.application_handle and c.application_id = application_id() ";

try {
install(sql, transactionIdFunction);
}
catch (Exception e) {
log.warn("Unable to install function " + this.parameterService.getTablePrefix() + FUNCTION_TRANSACTION_ID);
}
}

@Override
public void dropRequiredDatabaseObjects() {
String transactionIdFunction = this.parameterService.getTablePrefix() + FUNCTION_TRANSACTION_ID;
try {
uninstall(SQL_DROP_FUNCTION, transactionIdFunction);
}
catch (Exception e) {
} catch (Exception e) {
log.warn("Unable to uninstall function " + this.parameterService.getTablePrefix() + FUNCTION_TRANSACTION_ID);
}
}
Expand All @@ -148,8 +151,8 @@ public void enableSyncTriggers(ISqlTransaction transaction) {
}

public void disableSyncTriggers(ISqlTransaction transaction, String nodeId) {
transaction.prepareAndExecute("set " + parameterService.getTablePrefix() + VAR_TRIGGER_DISABLED + " = 1");
if (nodeId != null) {
transaction.prepareAndExecute("set " + parameterService.getTablePrefix() + VAR_TRIGGER_DISABLED + " = 1");
if (nodeId != null) {
transaction.prepareAndExecute("set " + getSourceNodeExpression() + " = '" + nodeId + "'");
}
}
Expand All @@ -159,8 +162,7 @@ public String getSyncTriggersExpression() {
}

@Override
public String getTransactionTriggerExpression(String defaultCatalog, String defaultSchema,
Trigger trigger) {
public String getTransactionTriggerExpression(String defaultCatalog, String defaultSchema, Trigger trigger) {
return "sym_transactionid()";
}

Expand All @@ -169,7 +171,6 @@ public boolean supportsTransactionId() {
return true;
}


public void cleanDatabase() {
}

Expand Down

0 comments on commit 7687ff7

Please sign in to comment.