Skip to content

Commit

Permalink
0003095: Add basic DDL replication
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed May 5, 2017
1 parent 5c87941 commit 22fdc81
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 3 deletions.
Expand Up @@ -363,6 +363,49 @@ public Boolean execute(Connection con) throws SQLException {
});
}

@Override
public boolean doesDdlTriggerExist(final String catalogName, final String schema, final String triggerName) {
return ((JdbcSqlTemplate) platform.getSqlTemplate())
.execute(new IConnectionCallback<Boolean>() {
public Boolean execute(Connection con) throws SQLException {
String previousCatalog = con.getCatalog();
PreparedStatement stmt = con
.prepareStatement("select count(*) from sys.triggers where name = ?");
try {
if (catalogName != null) {
con.setCatalog(catalogName);
}
stmt.setString(1, triggerName);
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
int count = rs.getInt(1);
return count > 0;
}
} finally {
if (catalogName != null) {
con.setCatalog(previousCatalog);
}
stmt.close();
}
return Boolean.FALSE;
}
});
}

@Override
public void removeDdlTrigger(StringBuilder sqlBuffer, String catalogName, String schemaName, String triggerName) {
String sql = "drop trigger " + triggerName + " on database";
logSql(sql, sqlBuffer);
if (parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
try {
log.info("Removing DDL trigger " + triggerName);
this.platform.getSqlTemplate().update(sql);
} catch (Exception e) {
log.warn("Tried to remove DDL trigger using: {} and failed because: {}", sql, e.getMessage());
}
}
}

public void disableSyncTriggers(ISqlTransaction transaction, String nodeId) {
if (supportsDisableTriggers()) {
if (nodeId == null) {
Expand Down
Expand Up @@ -177,6 +177,27 @@ public MsSqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
" end \n" +
"---- go");

sqlTemplates.put("ddlTriggerTemplate",
"create trigger $(triggerName) on database\n" +
"for create_table, drop_table, alter_table,\n" +
"create_view, drop_view, alter_view,\n" +
"create_function, drop_function, alter_function,\n" +
"create_procedure, drop_procedure, alter_procedure,\n" +
"create_trigger, drop_trigger, alter_trigger,\n" +
"create_index, drop_index, alter_index as\n" +
"declare @data xml\n" +
"declare @histId int\n" +
"set @data = eventdata()\n" +
"if (@data.value('(/EVENT_INSTANCE/ObjectName)[1]', 'nvarchar(128)') not like '$(prefixName)%') begin\n" +
" select @histId = max(trigger_hist_id) from " + defaultCatalog + "$(defaultSchema)$(prefixName)_trigger_hist where source_table_name = '$(prefixName)_node'\n" +
" insert into " + defaultCatalog + "$(defaultSchema)$(prefixName)_data\n" +
" (table_name, event_type, trigger_hist_id, row_data, channel_id, source_node_id, create_time)\n" +
" values ('$(prefixName)_node', '" + DataEventType.SQL.getCode() + "', @histId,\n" +
" '\"' + replace(replace(@data.value('(/EVENT_INSTANCE/TSQLCommand/CommandText)[1]', 'nvarchar(max)'),'\\','\\\\'),'\"','\\\"') + '\",ddl',\n" +
" 'config', dbo.$(prefixName)_node_disabled(), current_timestamp)\n" +
"end\n" +
"---- go");

sqlTemplates.put("initialLoadSqlTemplate" ,
"select $(columns) from $(schemaName)$(tableName) t where $(whereClause) " );

Expand Down
Expand Up @@ -243,6 +243,7 @@ private ParameterConstants() {

public final static String TRIGGER_UPDATE_CAPTURE_CHANGED_DATA_ONLY = "trigger.update.capture.changed.data.only.enabled";
public final static String TRIGGER_CREATE_BEFORE_INITIAL_LOAD = "trigger.create.before.initial.load.enabled";
public final static String TRIGGER_CAPTURE_DDL_CHANGES = "trigger.capture.ddl.changes";

public final static String DB_METADATA_IGNORE_CASE = "db.metadata.ignore.case";
public final static String DB_NATIVE_EXTRACTOR = "db.native.extractor";
Expand Down
Expand Up @@ -202,7 +202,11 @@ final public boolean doesTriggerExist(String catalogName, String schema, String
return false;
}
}


public boolean doesDdlTriggerExist(String catalogName, String schema, String triggerName) {
return false;
}

public abstract void dropRequiredDatabaseObjects();

public abstract void createRequiredDatabaseObjects();
Expand Down Expand Up @@ -314,6 +318,10 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc
}
}

public void removeDdlTrigger(StringBuilder sqlBuffer, String catalogName, String schemaName,
String triggerName) {
}

final protected void logSql(String sql, StringBuilder sqlBuffer) {
if (sqlBuffer != null && StringUtils.isNotBlank(sql)) {
sqlBuffer.append(sql);
Expand Down Expand Up @@ -403,6 +411,38 @@ protected String createPostTriggerDDL(DataEventType dml, Trigger trigger, Trigge
table, platform.getDefaultCatalog(), platform.getDefaultSchema());
}

public void createDdlTrigger(final String tablePrefix, StringBuilder sqlBuffer, String triggerName) {
if (parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
String triggerSql = triggerTemplate.createDdlTrigger(tablePrefix, platform.getDefaultCatalog(), platform.getDefaultSchema(),
triggerName);
log.info("Creating DDL trigger " + triggerName);

if (triggerSql != null) {
ISqlTransaction transaction = null;
try {
transaction = this.platform.getSqlTemplate().startSqlTransaction(
platform.getDatabaseInfo().isRequiresAutoCommitForDdl());

try {
log.debug("Running: {}", triggerSql);
logSql(triggerSql, sqlBuffer);
transaction.execute(triggerSql);
} catch (SqlException ex) {
log.info("Failed to create DDL trigger: {}", triggerSql);
throw ex;
}

transaction.commit();
} catch (SqlException ex) {
transaction.rollback();
throw ex;
} finally {
transaction.close();
}
}
}
}

public String getCreateSymmetricDDL() {
Database database = readSymmetricSchemaFromXml();
prefixConfigDatabase(database);
Expand Down
Expand Up @@ -379,6 +379,19 @@ public String createPostTriggerDDL(DataEventType dml, Trigger trigger, TriggerHi
defaultCatalog, defaultSchema, ddl);
}

public String createDdlTrigger(String tablePrefix, String defaultCatalog, String defaultSchema, String triggerName) {
String ddl = sqlTemplates.get("ddlTriggerTemplate");
if (ddl == null) {
return null;
}

ddl = FormatUtils.replace("triggerName", triggerName, ddl);
ddl = FormatUtils.replace("prefixName", tablePrefix, ddl);
ddl = replaceDefaultSchemaAndCatalog(ddl);

return ddl;
}

protected String getDefaultTargetTableName(Trigger trigger, TriggerHistory history) {
String targetTableName = null;
if (history != null) {
Expand Down
Expand Up @@ -51,6 +51,8 @@ public void createTrigger(StringBuilder sqlBuffer, DataEventType dml,
Trigger trigger, TriggerHistory hist, Channel channel,
String tablePrefix, Table table);

public void createDdlTrigger(String tablePrefix, StringBuilder sqlBuffer, String triggerName);

/*
* Get the name of this symmetric instance. This can be set in
* symmetric.properties using the engine.name property.
Expand All @@ -59,9 +61,13 @@ public void createTrigger(StringBuilder sqlBuffer, DataEventType dml,

public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String schemaName, String triggerName,
String tableName);

public void removeDdlTrigger(StringBuilder sqlBuffer, String catalogName, String schemaName, String triggerName);

public boolean doesTriggerExist(String catalogName, String schema, String tableName, String triggerName);

public boolean doesDdlTriggerExist(String catalogName, String schema, String triggerName);

public void verifyDatabaseIsCompatible();

public void initTablesAndDatabaseObjects();
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.job.IJobManager;
import org.jumpmind.symmetric.load.ConfigurationChangedDatabaseWriterFilter;
Expand Down Expand Up @@ -125,7 +126,14 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
if (me != null) {
NetworkedNode rootNetworkedNode = getRootNetworkNodeFromContext(routingContext);

if (tableMatches(dataMetaData, TableConstants.SYM_NODE)
if (tableMatches(dataMetaData, TableConstants.SYM_NODE)
&& dataMetaData.getData().getDataEventType().equals(DataEventType.SQL)
&& dataMetaData.getData().getParsedData(CsvData.ROW_DATA).length > 1
&& dataMetaData.getData().getParsedData(CsvData.ROW_DATA)[0].toUpperCase().contains("TABLE")) {
routingContext.put(CTX_KEY_RESYNC_NEEDED, Boolean.TRUE);
routeNodeTables(nodeIds, columnValues, rootNetworkedNode, me, routingContext,
dataMetaData, possibleTargetNodes, initialLoad);
} else if (tableMatches(dataMetaData, TableConstants.SYM_NODE)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_SECURITY)
|| tableMatches(dataMetaData, TableConstants.SYM_NODE_HOST)
|| tableMatches(dataMetaData, TableConstants.SYM_MONITOR_EVENT)) {
Expand Down
Expand Up @@ -1210,6 +1210,10 @@ public void syncTriggers(StringBuilder sqlBuffer, boolean force) {
updateOrCreateDatabaseTriggers(triggersForCurrentNode, sqlBuffer, force,
true, activeTriggerHistories, true);
resetTriggerRouterCacheByNodeGroupId();

if (createTriggersForTables) {
updateOrCreateDdlTriggers(sqlBuffer);
}
} finally {
clusterService.unlock(ClusterConstants.SYNCTRIGGERS);
log.info("Done synchronizing triggers");
Expand Down Expand Up @@ -1510,6 +1514,18 @@ public void syncTriggers(Table table, boolean force) {
}
}

protected void updateOrCreateDdlTriggers(StringBuilder sqlBuffer) {
String triggerName = tablePrefix + "_on_ddl";
boolean isCapture = parameterService.is(ParameterConstants.TRIGGER_CAPTURE_DDL_CHANGES, false);
boolean exists = symmetricDialect.doesDdlTriggerExist(platform.getDefaultCatalog(), platform.getDefaultSchema(), triggerName);

if (isCapture && !exists) {
symmetricDialect.createDdlTrigger(tablePrefix, sqlBuffer, triggerName);
} else if (!isCapture && exists) {
symmetricDialect.removeDdlTrigger(sqlBuffer, platform.getDefaultCatalog(), platform.getDefaultSchema(), triggerName);
}
}

protected void updateOrCreateDatabaseTriggers(final List<Trigger> triggers, final StringBuilder sqlBuffer,
final boolean force, final boolean verifyInDatabase, final List<TriggerHistory> activeTriggerHistories,
final boolean useTableCache) {
Expand Down Expand Up @@ -1809,7 +1825,7 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer,

return hist;
}

protected static String replaceCharsToShortenName(String triggerName) {
return triggerName.replaceAll("[^a-zA-Z0-9_]|[a|e|i|o|u|A|E|I|O|U]", "");
}
Expand Down
Expand Up @@ -1458,6 +1458,15 @@ trigger.update.capture.changed.data.only.enabled=false
# Type: boolean
trigger.create.before.initial.load.enabled=true

# Feature to install a DDL trigger to capture any schema changes, including
# tables, views, triggers, functions, and stored procedures, which are synced
# to all nodes on configured group links. Supported on MS SQL-Server only.
#
# DatabaseOverridable: true
# Tags: other
# Type: boolean
trigger.capture.ddl.changes=false

# This is a setting that instructs the data capture and data load to
# treat JDBC TIME, DATE, and TIMESTAMP columns as if they were VARCHAR
# columns. This means that the columns will be captured and loaded in
Expand Down

0 comments on commit 22fdc81

Please sign in to comment.