Skip to content

Commit

Permalink
0005815: DDL Capture Changes - Three enhancements to DDL trigger capture
Browse files Browse the repository at this point in the history
  • Loading branch information
joshahicks committed Apr 28, 2023
1 parent af50e07 commit 95c33f2
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 21 deletions.
Expand Up @@ -425,6 +425,8 @@ public MsSqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
"create_function, drop_function, alter_function,\n" +
"create_procedure, drop_procedure, alter_procedure,\n" +
"create_trigger, drop_trigger, alter_trigger,\n" +
"create_type, drop_type, \n" +
"create_schema, drop_schema, \n" +
"create_index, drop_index, alter_index as\n" +
"declare @data xml\n" +
"declare @eventType nvarchar(128)\n" +
Expand All @@ -441,14 +443,14 @@ public MsSqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
" set @tableName = @data.value('(/EVENT_INSTANCE/TargetObjectName)[1]', 'nvarchar(128)')\n" +
" select @histId = max(trigger_hist_id) from " + defaultCatalog + "$(defaultSchema)$(prefixName)_trigger_hist where source_table_name = @tableName and inactive_time is null\n" +
" if (@histId is not null) begin\n" +
" select @channelId = channel_id from sym_trigger where source_table_name = @tableName\n" +
" select @channelId = channel_id from " + defaultCatalog + "$(defaultSchema)$(prefixName)_trigger where source_table_name = @tableName\n" +
" if (@channelId is null)\n" +
" set @channelId = 'config'\n" +
" insert into " + defaultCatalog + "$(defaultSchema)$(prefixName)_data\n" +
" (table_name, event_type, trigger_hist_id, row_data, channel_id, source_node_id, create_time)\n" +
" values (@tableName, '" + DataEventType.SQL.getCode() + "', @histId,\n" +
" '\"delimiter " + delimiter + ";' + CHAR(13) + char(10) + replace(replace(@data.value('(/EVENT_INSTANCE/TSQLCommand/CommandText)[1]', 'nvarchar(max)'),'\\','\\\\'),'\"','\\\"') + '\",ddl',\n" +
" @channelId, dbo.$(prefixName)_node_disabled(), " + getCreateTimeExpression() + ")\n" +
" @channelId, " + defaultCatalog + "$(defaultSchema)$(prefixName)_node_disabled(), " + getCreateTimeExpression() + ")\n" +
" end\n" +
"end\n" + "---- go");

Expand All @@ -459,6 +461,8 @@ public MsSqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
"create_function, drop_function, alter_function,\n" +
"create_procedure, drop_procedure, alter_procedure,\n" +
"create_trigger, drop_trigger, alter_trigger,\n" +
"create_type, drop_type, \n" +
"create_schema, drop_schema, \n" +
"create_index, drop_index, alter_index as\n" +
"declare @data xml\n" +
"declare @eventType nvarchar(128)\n" +
Expand All @@ -477,14 +481,14 @@ public MsSqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
" if (@histId is null)\n" +
" set @tableName = '$(prefixName)_node';\n" +
" select @histId = max(trigger_hist_id) from " + defaultCatalog + "$(defaultSchema)$(prefixName)_trigger_hist where source_table_name = @tableName and inactive_time is null\n" +
" select @channelId = channel_id from sym_trigger where source_table_name = @tableName\n" +
" select @channelId = channel_id from " + defaultCatalog + "$(defaultSchema)$(prefixName)_trigger where source_table_name = @tableName\n" +
" if (@channelId is null)\n" +
" set @channelId = 'config'\n" +
" insert into " + defaultCatalog + "$(defaultSchema)$(prefixName)_data\n" +
" (table_name, event_type, trigger_hist_id, row_data, channel_id, source_node_id, create_time)\n" +
" values (@tableName, '" + DataEventType.SQL.getCode() + "', @histId,\n" +
" '\"delimiter " + delimiter + ";' + CHAR(13) + char(10) + replace(replace(@data.value('(/EVENT_INSTANCE/TSQLCommand/CommandText)[1]', 'nvarchar(max)'),'\\','\\\\'),'\"','\\\"') + '\",ddl',\n" +
" @channelId, dbo.$(prefixName)_node_disabled(), " + getCreateTimeExpression() + ")\n" +
" @channelId, " + defaultCatalog + "$(defaultSchema)$(prefixName)_node_disabled(), " + getCreateTimeExpression() + ")\n" +
"end\n" + "---- go");

sqlTemplates.put("initialLoadSqlTemplate" ,
Expand Down
Expand Up @@ -374,6 +374,7 @@ private ParameterConstants() {
public final static String MSSQL_BULK_LOAD_FIELD_TERMINATOR = "mssql.bulk.load.field.terminator";
public final static String MSSQL_BULK_LOAD_USE_BCP = "mssql.bulk.load.use.bcp";
public final static String MSSQL_BULK_LOAD_BCP_CMD = "mssql.bulk.load.bcp.cmd";
public final static String MSSQL_USE_SNAPSHOT_ISOLATION = "mssql.use.snapshot.isolation";
public final static String SYBASE_ROW_LEVEL_LOCKS_ONLY = "sybase.allow.only.row.level.locks.on.runtime.tables";
public final static String SYBASE_CHANGE_IDENTITY_GAP = "sybase.change.identity.gap.on.runtime.tables";
public final static String SQLITE_TRIGGER_FUNCTION_TO_USE = "sqlite.trigger.function.to.use";
Expand Down
Expand Up @@ -432,9 +432,9 @@ protected String createPostTriggerDDL(DataEventType dml, Trigger trigger, Trigge
platform.getDefaultSchema());
}

public void createDdlTrigger(final String tablePrefix, StringBuilder sqlBuffer, String triggerName) {
public void createDdlTrigger(final String tablePrefix, StringBuilder sqlBuffer, String triggerName, String runtimeCatalog, String runtimeSchema) {
if (parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
String triggerSql = triggerTemplate.createDdlTrigger(tablePrefix, platform.getDefaultCatalog(), platform.getDefaultSchema(),
String triggerSql = triggerTemplate.createDdlTrigger(tablePrefix, runtimeCatalog, runtimeSchema,
triggerName);
log.info("Creating DDL trigger " + triggerName);
if (triggerSql != null) {
Expand Down
Expand Up @@ -277,13 +277,17 @@ protected String getSourceTableSchema(TriggerHistory triggerHistory) {
}

protected String replaceDefaultSchemaAndCatalog(String sql) {
String defaultCatalog = symmetricDialect.getPlatform().getDefaultCatalog();
String defaultSchema = symmetricDialect.getPlatform().getDefaultSchema();
return replaceDefaultSchemaAndCatalog(sql, null, null);
}

protected String replaceDefaultSchemaAndCatalog(String sql, String catalog, String schema) {
String defaultCatalog = catalog != null ? catalog : symmetricDialect.getPlatform().getDefaultCatalog();
String defaultSchema = schema != null ? schema : symmetricDialect.getPlatform().getDefaultSchema();
sql = replaceDefaultSchema(sql, defaultSchema);
sql = replaceDefaultCatalog(sql, defaultCatalog);
return sql;
}

public String createCsvDataSql(Trigger trigger, TriggerHistory triggerHistory, Table originalTable,
Channel channel, String whereClause) {
Table table = originalTable.copyAndFilterColumns(triggerHistory.getParsedColumnNames(),
Expand Down Expand Up @@ -376,7 +380,7 @@ public String createDdlTrigger(String tablePrefix, String defaultCatalog, String
}
ddl = FormatUtils.replace("triggerName", triggerName, ddl);
ddl = FormatUtils.replace("prefixName", tablePrefix, ddl);
ddl = replaceDefaultSchemaAndCatalog(ddl);
ddl = replaceDefaultSchemaAndCatalog(ddl, defaultCatalog, defaultSchema);
return ddl;
}

Expand Down
Expand Up @@ -54,7 +54,7 @@ public void createTrigger(StringBuilder sqlBuffer, DataEventType dml,
Trigger trigger, TriggerHistory hist, Channel channel,
String tablePrefix, Table table);

public void createDdlTrigger(String tablePrefix, StringBuilder sqlBuffer, String triggerName);
public void createDdlTrigger(String tablePrefix, StringBuilder sqlBuffer, String triggerName, String runtimeCatalog, String runtimeSchema);

/*
* Get the name of this symmetric instance. This can be set in symmetric.properties using the engine.name property.
Expand Down
Expand Up @@ -1803,32 +1803,32 @@ protected void updateOrCreateDdlTriggers(StringBuilder sqlBuffer) {
String filteredDdlTriggerName = tablePrefix + "_on_filtered_ddl";
boolean isCapture = parameterService.is(ParameterConstants.TRIGGER_CAPTURE_DDL_CHANGES, false);
boolean isFiltered = parameterService.is(ParameterConstants.TRIGGER_CAPTURE_DDL_CHECK_TRIGGER_HIST, true);
boolean allDdlTriggerExists = symmetricDialect.doesDdlTriggerExist(platform.getDefaultCatalog(),
platform.getDefaultSchema(), allDdlTriggerName);
boolean filteredDdlTriggerExists = symmetricDialect.doesDdlTriggerExist(platform.getDefaultCatalog(),
platform.getDefaultSchema(), filteredDdlTriggerName);
boolean allDdlTriggerExists = getTargetDialect().doesDdlTriggerExist(getTargetPlatform().getDefaultCatalog(),
getTargetPlatform().getDefaultSchema(), allDdlTriggerName);
boolean filteredDdlTriggerExists = getTargetDialect().doesDdlTriggerExist(getTargetPlatform().getDefaultCatalog(),
getTargetPlatform().getDefaultSchema(), filteredDdlTriggerName);
if (isCapture) {
if (isFiltered) {
if (allDdlTriggerExists) {
symmetricDialect.removeDdlTrigger(sqlBuffer, platform.getDefaultCatalog(), platform.getDefaultSchema(), allDdlTriggerName);
getTargetDialect().removeDdlTrigger(sqlBuffer, getTargetPlatform().getDefaultCatalog(), getTargetPlatform().getDefaultSchema(), allDdlTriggerName);
}
if (!filteredDdlTriggerExists) {
symmetricDialect.createDdlTrigger(tablePrefix, sqlBuffer, filteredDdlTriggerName);
getTargetDialect().createDdlTrigger(tablePrefix, sqlBuffer, filteredDdlTriggerName, platform.getDefaultCatalog(), platform.getDefaultSchema());
}
} else {
if (!allDdlTriggerExists) {
symmetricDialect.createDdlTrigger(tablePrefix, sqlBuffer, allDdlTriggerName);
getTargetDialect().createDdlTrigger(tablePrefix, sqlBuffer, allDdlTriggerName, platform.getDefaultCatalog(), platform.getDefaultSchema());
}
if (filteredDdlTriggerExists) {
symmetricDialect.removeDdlTrigger(sqlBuffer, platform.getDefaultCatalog(), platform.getDefaultSchema(), filteredDdlTriggerName);
getTargetDialect().removeDdlTrigger(sqlBuffer, getTargetPlatform().getDefaultCatalog(), getTargetPlatform().getDefaultSchema(), filteredDdlTriggerName);
}
}
} else {
if (allDdlTriggerExists) {
symmetricDialect.removeDdlTrigger(sqlBuffer, platform.getDefaultCatalog(), platform.getDefaultSchema(), allDdlTriggerName);
getTargetDialect().removeDdlTrigger(sqlBuffer, platform.getDefaultCatalog(), platform.getDefaultSchema(), allDdlTriggerName);
}
if (filteredDdlTriggerExists) {
symmetricDialect.removeDdlTrigger(sqlBuffer, platform.getDefaultCatalog(), platform.getDefaultSchema(), filteredDdlTriggerName);
getTargetDialect().removeDdlTrigger(sqlBuffer, platform.getDefaultCatalog(), platform.getDefaultSchema(), filteredDdlTriggerName);
}
}
}
Expand Down
Expand Up @@ -2618,6 +2618,13 @@ mssql.bulk.load.use.bcp=false
# Tags: other
mssql.bulk.load.bcp.cmd=


# Specifies if snapshot isolation should be automatically turned on.
#
# DatabaseOverridable: true
# Tags: other
mssql.use.snapshot.isolation=false

# Automatically update data, data_event and outgoing_batch tables to allow only
# row level locking.
#
Expand Down

0 comments on commit 95c33f2

Please sign in to comment.