Skip to content

Commit

Permalink
Added support for trickling across schemas/catalogs (at least from my…
Browse files Browse the repository at this point in the history
…sql to oracle)
  • Loading branch information
chenson42 committed Oct 1, 2007
1 parent 5f2322a commit 191bfe0
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 189 deletions.
Expand Up @@ -6,6 +6,7 @@
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -28,6 +29,8 @@
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.support.JdbcUtils;
import org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator;
Expand Down Expand Up @@ -146,7 +149,7 @@ public Table findTable(String schema, String tableName) throws Exception {
c = platform.borrowConnection();
DatabaseMetaDataWrapper metaData = new DatabaseMetaDataWrapper();
metaData.setMetaData(c.getMetaData());
metaData.setCatalog(null);
metaData.setCatalog(schema);
metaData.setSchemaPattern(schema);
metaData.setTableTypes(null);
ResultSet tableData = metaData.getTables(tableName);
Expand Down Expand Up @@ -297,12 +300,39 @@ protected String readPrimaryKeyName(DatabaseMetaDataWrapper metaData,
return (String) values.get("COLUMN_NAME");
}

public void initTrigger(DataEventType dml, Trigger trigger,
TriggerHistory audit, String tablePrefix, Table table) {
logger.info("Creating " + dml.toString() + " trigger for "
+ trigger.getSourceTableName());
jdbcTemplate.update(createTriggerDDL(dml, trigger, audit, tablePrefix,
table));
/**
* Create the configured trigger. The catalog will be changed to the source schema if the source schema
* is configured.
*/
public void initTrigger(final DataEventType dml, final Trigger trigger,
final TriggerHistory audit, final String tablePrefix,
final Table table) {
jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection con) throws SQLException,
DataAccessException {
String catalog = trigger.getSourceSchemaName();
logger.info("Creating " + dml.toString() + " trigger for "
+ (catalog != null ? (catalog + ".") : "")
+ trigger.getSourceTableName());
String previousCatalog = con.getCatalog();
try {
if (catalog != null) {
con.setCatalog(catalog);
}
Statement stmt = con.createStatement();
stmt.executeUpdate(createTriggerDDL(dml, trigger, audit,
tablePrefix, table));
stmt.close();
} finally {
if (catalog != null
&& !catalog.equalsIgnoreCase(previousCatalog)) {
con.setCatalog(previousCatalog);
}
}

return null;
}
});
}

public String createTriggerDDL(DataEventType dml, Trigger config,
Expand Down
171 changes: 89 additions & 82 deletions symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java
@@ -1,19 +1,18 @@

package org.jumpmind.symmetric.db;

import java.sql.Types;
import java.util.Map;

import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.StringUtils;
import org.apache.ddlutils.model.Column;
import org.apache.ddlutils.model.Table;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.DataEventType;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;

public class SqlTemplate
{
public class SqlTemplate {

static final String INSERT_TRIGGER_TEMPLATE = "insertTriggerTemplate";

Expand All @@ -39,13 +38,17 @@ public class SqlTemplate

String oldTriggerValue;

public String createInitalLoadSql(Node node, IDbDialect dialect, Trigger config, Table metaData)
{
public String createInitalLoadSql(Node node, IDbDialect dialect,
Trigger config, Table metaData) {
String sql = sqlTemplates.get(INITIAL_LOAD_SQL_TEMPLATE);
sql = replace("tableName", config.getSourceTableName(), sql);
sql = replace("schemaName", config.getSourceSchemaName() != null ? config.getSourceSchemaName() + "." : "", sql);
sql = replace("initialLoadCondition", config.getInitialLoadSelect() == null ? "1=1" : config
.getInitialLoadSelect(), sql);
sql = replace("schemaName",
config.getSourceSchemaName() != null ? config
.getSourceSchemaName()
+ "." : "", sql);
sql = replace("initialLoadCondition",
config.getInitialLoadSelect() == null ? "1=1" : config
.getInitialLoadSelect(), sql);

// Replace these parameters to give the initiaLoadContition a chance to reference domainNames and domainIds
sql = replace("groupId", node.getNodeGroupId(), sql);
Expand All @@ -57,34 +60,47 @@ public String createInitalLoadSql(Node node, IDbDialect dialect, Trigger config,
return sql;
}

public String createTriggerDDL(IDbDialect dialect, DataEventType dml, Trigger config, TriggerHistory audit,
String tablePrefix, Table metaData, String defaultSchema)
{
public String createTriggerDDL(IDbDialect dialect, DataEventType dml,
Trigger trigger, TriggerHistory history, String tablePrefix,
Table metaData, String defaultSchema) {

String ddl = sqlTemplates.get(dml.name().toLowerCase() + "TriggerTemplate");
if (ddl == null)
{
throw new NotImplementedException(dml.name() + " trigger is not implemented for "
+ dialect.getPlatform().getName());
String ddl = sqlTemplates.get(dml.name().toLowerCase()
+ "TriggerTemplate");
if (ddl == null) {
throw new NotImplementedException(dml.name()
+ " trigger is not implemented for "
+ dialect.getPlatform().getName());
}
ddl = replace("tableName", config.getSourceTableName().toUpperCase(), ddl);
ddl = replace("schemaName", config.getSourceSchemaName() != null ? config.getSourceSchemaName().toUpperCase() + "." : "",
ddl);
ddl = replace("defaultSchema", defaultSchema != null && defaultSchema.length() > 0 ? defaultSchema + "." : "",
ddl);
ddl = replace("triggerName", config.getTriggerName(dml).toUpperCase(), ddl);
ddl = replace("tableName", trigger.getSourceTableName().toUpperCase(),
ddl);
ddl = replace("targetTableName", getTargetTableName(trigger).toUpperCase(),
ddl);
ddl = replace("schemaName",
trigger.getSourceSchemaName() != null ? trigger
.getSourceSchemaName().toUpperCase()
+ "." : "", ddl);
ddl = replace("defaultSchema", defaultSchema != null
&& defaultSchema.length() > 0 ? defaultSchema + "." : "", ddl);
ddl = replace("triggerName", trigger.getTriggerName(dml).toUpperCase(),
ddl);
ddl = replace("prefixName", tablePrefix, ddl);
ddl = replace("targetGroupId", config.getTargetGroupId(), ddl);
ddl = replace("channelName", config.getChannelId(), ddl);
ddl = replace("triggerHistoryId", Integer.toString(audit.getTriggerHistoryId()), ddl);
ddl = replace("txIdExpression", config.getTxIdExpression() == null ? dialect
.getTransactionTriggerExpression() : config.getTxIdExpression(), ddl);
ddl = replace("nodeSelectWhere", config.getNodeSelect(), ddl);
ddl = replace("syncOnInsertCondition", config.getSyncOnInsertCondition(), ddl);
ddl = replace("syncOnUpdateCondition", config.getSyncOnUpdateCondition(), ddl);
ddl = replace("syncOnDeleteCondition", config.getSyncOnDeleteCondition(), ddl);

Column[] columns = config.orderColumnsForTable(metaData);
ddl = replace("targetGroupId", trigger.getTargetGroupId(), ddl);
ddl = replace("channelName", trigger.getChannelId(), ddl);
ddl = replace("triggerHistoryId", Integer.toString(history
.getTriggerHistoryId()), ddl);
ddl = replace("txIdExpression",
trigger.getTxIdExpression() == null ? dialect
.getTransactionTriggerExpression() : trigger
.getTxIdExpression(), ddl);
ddl = replace("nodeSelectWhere", trigger.getNodeSelect(), ddl);
ddl = replace("syncOnInsertCondition", trigger
.getSyncOnInsertCondition(), ddl);
ddl = replace("syncOnUpdateCondition", trigger
.getSyncOnUpdateCondition(), ddl);
ddl = replace("syncOnDeleteCondition", trigger
.getSyncOnDeleteCondition(), ddl);

Column[] columns = trigger.orderColumnsForTable(metaData);
String columnsText = buildColumnString(newTriggerValue, columns);
ddl = replace("columns", columnsText, ddl);

Expand All @@ -95,14 +111,19 @@ public String createTriggerDDL(IDbDialect dialect, DataEventType dml, Trigger co
return ddl;
}

private String buildColumnString(String tableAlias, Column[] columns)
{
private String getTargetTableName(Trigger trigger) {
if (StringUtils.isBlank(trigger.getTargetTableName())) {
return trigger.getSourceTableName();
} else {
return trigger.getTargetTableName();
}
}

private String buildColumnString(String tableAlias, Column[] columns) {
String columnsText = "";
for (Column column : columns)
{
for (Column column : columns) {
String templateToUse = null;
switch (column.getTypeCode())
{
switch (column.getTypeCode()) {
case Types.BIT:
case Types.TINYINT:
case Types.SMALLINT:
Expand Down Expand Up @@ -141,98 +162,84 @@ private String buildColumnString(String tableAlias, Column[] columns)
case Types.STRUCT:
case Types.REF:
case Types.DATALINK:
throw new NotImplementedException(column.getName() + " is of type " + column.getType());
throw new NotImplementedException(column.getName()
+ " is of type " + column.getType());
}

if (templateToUse != null)
{
if (templateToUse != null) {
templateToUse = templateToUse.trim();
}
else
{
} else {
throw new NotImplementedException();
}

columnsText = columnsText + "\n " + replace("columnName", column.getName(), templateToUse);
columnsText = columnsText + "\n "
+ replace("columnName", column.getName(), templateToUse);

}

String LAST_COMMAN_TOKEN = triggerConcatCharacter + "','" + triggerConcatCharacter;
String LAST_COMMAN_TOKEN = triggerConcatCharacter + "','"
+ triggerConcatCharacter;

if (columnsText.endsWith(LAST_COMMAN_TOKEN))
{
columnsText = columnsText.substring(0, columnsText.length() - LAST_COMMAN_TOKEN.length());
if (columnsText.endsWith(LAST_COMMAN_TOKEN)) {
columnsText = columnsText.substring(0, columnsText.length()
- LAST_COMMAN_TOKEN.length());
}

return replace("tableAlias", tableAlias, columnsText);

}

private String replace(String prop, String replaceWith, String sourceString)
{
private String replace(String prop, String replaceWith, String sourceString) {
String replaceString = "\\$\\(" + prop + "\\)";
if (sourceString.contains("$(" + prop + ")"))
{
return sourceString.replaceAll(replaceString, String.valueOf(replaceWith));
}
else
{
if (sourceString.contains("$(" + prop + ")")) {
return sourceString.replaceAll(replaceString, String
.valueOf(replaceWith));
} else {
return sourceString;
}
}

public void setStringColumnTemplate(String columnTemplate)
{
public void setStringColumnTemplate(String columnTemplate) {
this.stringColumnTemplate = columnTemplate;
}

public void setDatetimeColumnTemplate(String datetimeColumnTemplate)
{
public void setDatetimeColumnTemplate(String datetimeColumnTemplate) {
this.datetimeColumnTemplate = datetimeColumnTemplate;
}

public void setNumberColumnTemplate(String numberColumnTemplate)
{
public void setNumberColumnTemplate(String numberColumnTemplate) {
this.numberColumnTemplate = numberColumnTemplate;
}

public void setSqlTemplates(Map<String, String> sqlTemplates)
{
public void setSqlTemplates(Map<String, String> sqlTemplates) {
this.sqlTemplates = sqlTemplates;
}

public String getClobColumnTemplate()
{
public String getClobColumnTemplate() {
return clobColumnTemplate;
}

public void setClobColumnTemplate(String clobColumnTemplate)
{
public void setClobColumnTemplate(String clobColumnTemplate) {
this.clobColumnTemplate = clobColumnTemplate;
}

public void setTriggerConcatCharacter(String triggerConcatCharacter)
{
public void setTriggerConcatCharacter(String triggerConcatCharacter) {
this.triggerConcatCharacter = triggerConcatCharacter;
}

public String getNewTriggerValue()
{
public String getNewTriggerValue() {
return newTriggerValue;
}

public void setNewTriggerValue(String newTriggerValue)
{
public void setNewTriggerValue(String newTriggerValue) {
this.newTriggerValue = newTriggerValue;
}

public String getOldTriggerValue()
{
public String getOldTriggerValue() {
return oldTriggerValue;
}

public void setOldTriggerValue(String oldTriggerValue)
{
public void setOldTriggerValue(String oldTriggerValue) {
this.oldTriggerValue = oldTriggerValue;
}
}
Expand Up @@ -75,7 +75,7 @@ public void enableSyncTriggers() {
}

public String getTransactionTriggerExpression() {
return TRANSACTION_ID_FUNCTION_NAME + "()";
return getDefaultSchema() + "." + TRANSACTION_ID_FUNCTION_NAME + "()";
}

public boolean isCharSpacePadded()
Expand Down

0 comments on commit 191bfe0

Please sign in to comment.