Skip to content

Commit

Permalink
fixed a connection leak issue on the mysql side. the trigger on sym_n…
Browse files Browse the repository at this point in the history
…ode was causing updates to sym_node to lock up at times. removed the trigger as it didn't work on oracle either and articially generate heartbeat events.
  • Loading branch information
chenson42 committed Oct 3, 2007
1 parent 5485431 commit ce5ac97
Show file tree
Hide file tree
Showing 19 changed files with 321 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ddlutils.Platform;
Expand Down Expand Up @@ -51,6 +50,8 @@ abstract public class AbstractDbDialect implements IDbDialect {

private Map<Integer, String> _defaultSizes;

protected String tablePrefix;

protected AbstractDbDialect() {
_defaultSizes = new HashMap<Integer, String>();
_defaultSizes.put(new Integer(1), "254");
Expand Down Expand Up @@ -83,11 +84,22 @@ public void initConfigDb(String tablePrefix) {
addPrefixAndCreateTableIfNecessary(getConfigDdlDatabase(), tablePrefix);
}

protected boolean isSkipTriggerCreation(String table) {
return table.toLowerCase().equals(tablePrefix + "_node");
}

public boolean doesTriggerExist(String schema, String tableName,
String triggerName) {
throw new NotImplementedException();
if (!isSkipTriggerCreation(tableName)) {
return doesTriggerExistOnPlatform(schema, tableName, triggerName);
} else {
return true;
}
}

abstract protected boolean doesTriggerExistOnPlatform(String schema,
String tableName, String triggerName);

public String getTransactionTriggerExpression() {
return "null";
}
Expand All @@ -100,7 +112,21 @@ public String createInitalLoadSqlFor(Node node, Trigger config) {
getMetaDataFor(config.getSourceSchemaName(), config
.getSourceTableName(), true)).trim();
}


public String createCsvDataSql(Trigger trig, String whereClause) {
return sqlTemplate.createCsvDataSql(
trig,
getMetaDataFor(trig.getSourceSchemaName(), trig
.getSourceTableName(), true), whereClause).trim();
}

public String createCsvPrimaryKeySql(Trigger trig, String whereClause) {
return sqlTemplate.createCsvPrimaryKeySql(
trig,
getMetaDataFor(trig.getSourceSchemaName(), trig
.getSourceTableName(), true), whereClause).trim();
}

/**
* This method uses the ddlutil's model reader which uses the jdbc metadata to lookup up
* table metadata.
Expand Down Expand Up @@ -307,32 +333,40 @@ protected String readPrimaryKeyName(DatabaseMetaDataWrapper metaData,
public void initTrigger(final DataEventType dml, final Trigger trigger,
final TriggerHistory audit, final String tablePrefix,
final Table table) {
jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection con) throws SQLException,
DataAccessException {
String catalog = trigger.getSourceSchemaName();
logger.info("Creating " + dml.toString() + " trigger for "
+ (catalog != null ? (catalog + ".") : "")
+ trigger.getSourceTableName());
String previousCatalog = con.getCatalog();
try {
if (catalog != null) {
con.setCatalog(catalog);
}
Statement stmt = con.createStatement();
stmt.executeUpdate(createTriggerDDL(dml, trigger, audit,
tablePrefix, table));
stmt.close();
} finally {
if (catalog != null
&& !catalog.equalsIgnoreCase(previousCatalog)) {
con.setCatalog(previousCatalog);
if (!isSkipTriggerCreation(trigger.getSourceTableName())) {
jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection con)
throws SQLException, DataAccessException {
String catalog = trigger.getSourceSchemaName();
logger.info("Creating " + dml.toString() + " trigger for "
+ (catalog != null ? (catalog + ".") : "")
+ trigger.getSourceTableName());
String previousCatalog = con.getCatalog();
try {
if (catalog != null) {
con.setCatalog(catalog);
}
Statement stmt = con.createStatement();
stmt.executeUpdate(createTriggerDDL(dml, trigger,
audit, tablePrefix, table));
stmt.close();
} finally {
if (catalog != null
&& !catalog.equalsIgnoreCase(previousCatalog)) {
con.setCatalog(previousCatalog);
}
}

return null;
}
});
} else {
logger
.warn("Not creating trigger for "
+ trigger.getSourceTableName()
+ " because of a current bug we have with a trigger not being able to select from the table it fired for.");
}

return null;
}
});
}

public String createTriggerDDL(DataEventType dml, Trigger config,
Expand All @@ -350,7 +384,8 @@ protected void addPrefixAndCreateTableIfNecessary(Database targetTables,

boolean createTables = false;
for (Table table : tables) {
table.setName(tablePrefix.toUpperCase() + table.getName().toUpperCase());
table.setName(tablePrefix.toUpperCase()
+ table.getName().toUpperCase());
fixForeignKeys(table, tablePrefix, false);

if (getMetaDataFor(getDefaultSchema(), table.getName()
Expand Down Expand Up @@ -436,4 +471,8 @@ public void setSqlTemplate(SqlTemplate sqlTemplate) {
public SQLErrorCodeSQLExceptionTranslator getSqlErrorTranslator() {
return sqlErrorTranslator;
}

public void setTablePrefix(String tablePrefix) {
this.tablePrefix = tablePrefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public void initTrigger(DataEventType dml, Trigger config,
public String getTransactionTriggerExpression();

public String createInitalLoadSqlFor(Node client, Trigger config);

public String createCsvDataSql(Trigger trig, String whereClause);

public String createCsvPrimaryKeySql(Trigger trig, String whereClause);

public boolean isCharSpacePadded();

Expand Down
42 changes: 36 additions & 6 deletions symmetric/src/main/java/org/jumpmind/symmetric/db/SqlTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,56 @@ public class SqlTemplate {
String oldTriggerValue;

public String createInitalLoadSql(Node node, IDbDialect dialect,
Trigger config, Table metaData) {
Trigger trig, Table metaData) {
String sql = sqlTemplates.get(INITIAL_LOAD_SQL_TEMPLATE);
sql = replace("tableName", config.getSourceTableName(), sql);
sql = replace("tableName", trig.getSourceTableName(), sql);
sql = replace("schemaName",
config.getSourceSchemaName() != null ? config
trig.getSourceSchemaName() != null ? trig
.getSourceSchemaName()
+ "." : "", sql);
sql = replace("initialLoadCondition",
config.getInitialLoadSelect() == null ? "1=1" : config
sql = replace("whereClause",
trig.getInitialLoadSelect() == null ? "1=1" : trig
.getInitialLoadSelect(), sql);

// Replace these parameters to give the initiaLoadContition a chance to reference domainNames and domainIds
sql = replace("groupId", node.getNodeGroupId(), sql);
sql = replace("externalId", node.getExternalId(), sql);

Column[] columns = config.orderColumnsForTable(metaData);
Column[] columns = trig.orderColumnsForTable(metaData);
String columnsText = buildColumnString("t", columns);
sql = replace("columns", columnsText, sql);
return sql;
}

public String createCsvDataSql(Trigger trig, Table metaData, String whereClause) {
String sql = sqlTemplates.get(INITIAL_LOAD_SQL_TEMPLATE);
sql = replace("tableName", trig.getSourceTableName(), sql);
sql = replace("schemaName",
trig.getSourceSchemaName() != null ? trig
.getSourceSchemaName()
+ "." : "", sql);
sql = replace("whereClause", whereClause, sql);

Column[] columns = trig.orderColumnsForTable(metaData);
String columnsText = buildColumnString("t", columns);
sql = replace("columns", columnsText, sql);
return sql;
}

public String createCsvPrimaryKeySql(Trigger trig, Table metaData, String whereClause) {
String sql = sqlTemplates.get(INITIAL_LOAD_SQL_TEMPLATE);
sql = replace("tableName", trig.getSourceTableName(), sql);
sql = replace("schemaName",
trig.getSourceSchemaName() != null ? trig
.getSourceSchemaName()
+ "." : "", sql);
sql = replace("whereClause", whereClause, sql);

Column[] columns = metaData.getPrimaryKeyColumns();
String columnsText = buildColumnString("t", columns);
sql = replace("columns", columnsText, sql);
return sql;
}

public String createTriggerDDL(IDbDialect dialect, DataEventType dml,
Trigger trigger, TriggerHistory history, String tablePrefix,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
public class MySqlDbDialect extends AbstractDbDialect implements IDbDialect {

static final Log logger = LogFactory.getLog(MySqlDbDialect.class);

static final String TRANSACTION_ID_FUNCTION_NAME = "fn_transaction_id";

static final String SYNC_TRIGGERS_DISABLED_USER_VARIABLE = "@sync_triggers_disabled";

protected void initForSpecificDialect() {
try {
if (!isFunctionUpToDate(TRANSACTION_ID_FUNCTION_NAME)) {
logger.info("Creating function " + TRANSACTION_ID_FUNCTION_NAME);
logger
.info("Creating function "
+ TRANSACTION_ID_FUNCTION_NAME);
new SqlScript(getTransactionIdSqlUrl(), getPlatform()
.getDataSource(), '/').execute();
}
Expand All @@ -36,68 +38,73 @@ private URL getTransactionIdSqlUrl() {
public boolean isFunctionUpToDate(String name) throws Exception {
long lastModified = getTransactionIdSqlUrl().openConnection()
.getLastModified();
String checkSchema = (getDefaultSchema() != null && getDefaultSchema().length() > 0) ? " and routine_schema='"+getDefaultSchema()+"'" : "";
String checkSchema = (getDefaultSchema() != null && getDefaultSchema()
.length() > 0) ? " and routine_schema='" + getDefaultSchema()
+ "'" : "";
return jdbcTemplate
.queryForInt(
"select count(*) from information_schema.routines where created >= ? and routine_name=?" + checkSchema,
new Object[] { new Date(lastModified), name }) > 0;
"select count(*) from information_schema.routines where created >= ? and routine_name=?"
+ checkSchema, new Object[] {
new Date(lastModified), name }) > 0;
}

@Override
public boolean doesTriggerExist(String schema, String tableName, String triggerName) {
schema = schema == null ? (getDefaultSchema() == null ? null : getDefaultSchema()) : schema;
String checkSchema = (schema != null && schema.length() > 0) ? " and trigger_schema='"+schema+"'" : "";
protected boolean doesTriggerExistOnPlatform(String schema,
String tableName, String triggerName) {
schema = schema == null ? (getDefaultSchema() == null ? null
: getDefaultSchema()) : schema;
String checkSchema = (schema != null && schema.length() > 0) ? " and trigger_schema='"
+ schema + "'"
: "";
return jdbcTemplate
.queryForInt(
"select count(*) from information_schema.triggers where trigger_name like ? and event_object_table like ?" + checkSchema,
new Object[] { triggerName, tableName }) > 0;
"select count(*) from information_schema.triggers where trigger_name like ? and event_object_table like ?"
+ checkSchema, new Object[] { triggerName,
tableName }) > 0;
}


// TODO this belongs in SqlTemplate
public void removeTrigger(String schemaName, String triggerName) {
schemaName = schemaName == null ? "" : (schemaName + ".");
try {
jdbcTemplate
.update("drop trigger " + schemaName + triggerName);
}
catch (Exception e) {
logger.warn("Trigger does not exist: " + schemaName + triggerName, e);
jdbcTemplate.update("drop trigger " + schemaName + triggerName);
} catch (Exception e) {
logger.warn("Trigger does not exist");
}
}

public void disableSyncTriggers() {
jdbcTemplate.update("set " + SYNC_TRIGGERS_DISABLED_USER_VARIABLE + "=1");
jdbcTemplate.update("set " + SYNC_TRIGGERS_DISABLED_USER_VARIABLE
+ "=1");
}

public void enableSyncTriggers() {
jdbcTemplate.update("set " + SYNC_TRIGGERS_DISABLED_USER_VARIABLE + "=null");
jdbcTemplate.update("set " + SYNC_TRIGGERS_DISABLED_USER_VARIABLE
+ "=null");
}

public String getTransactionTriggerExpression() {
return getDefaultSchema() + "." + TRANSACTION_ID_FUNCTION_NAME + "()";
}

public boolean isCharSpacePadded()
{

public boolean isCharSpacePadded() {
return false;
}

public boolean isCharSpaceTrimmed()
{

public boolean isCharSpaceTrimmed() {
return true;
}

public boolean isEmptyStringNulled()
{

public boolean isEmptyStringNulled() {
return false;
}

public void purge() {
}

public String getDefaultSchema() {
return (String)jdbcTemplate.queryForObject("select database()", String.class);
return (String) jdbcTemplate.queryForObject("select database()",
String.class);
}

}
Loading

0 comments on commit ce5ac97

Please sign in to comment.