Skip to content

Commit

Permalink
0001202: If a wild card trigger matches more than one table, some tri…
Browse files Browse the repository at this point in the history
…ggers may not be created.
  • Loading branch information
abrougher committed May 6, 2013
1 parent 2dbb704 commit ed23682
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 46 deletions.
@@ -1,22 +1,22 @@
/*
* Licensed to JumpMind Inc under one or more contributor
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU Lesser General Public License (the
* "License"); you may not use this file except in compliance
* with the License.
*
* with the License.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/
package org.jumpmind.symmetric.service.impl;

Expand Down Expand Up @@ -86,11 +86,11 @@ public class TriggerRouterService extends AbstractService implements ITriggerRou
private TriggerFailureListener failureListener = new TriggerFailureListener();

private IStatisticManager statisticManager;

private IGroupletService groupletService;

private List<String> extraConfigTables = new ArrayList<String>();

private Date lastUpdateTime;

/**
Expand All @@ -109,13 +109,13 @@ public TriggerRouterService(ISymmetricEngine engine) {
setSqlMap(new TriggerRouterServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
}

public boolean refreshFromDatabase() {
Date date1 = sqlTemplate.queryForObject(getSql("selectMaxTriggerLastUpdateTime"), Date.class);
Date date2 = sqlTemplate.queryForObject(getSql("selectMaxRouterLastUpdateTime"), Date.class);
Date date3 = sqlTemplate.queryForObject(getSql("selectMaxTriggerRouterLastUpdateTime"), Date.class);
Date date = maxDate(date1, date2, date3);

if (date != null) {
if (lastUpdateTime == null || lastUpdateTime.before(date)) {
if (lastUpdateTime != null) {
Expand All @@ -133,7 +133,7 @@ public List<Trigger> getTriggers() {
return sqlTemplate.query("select "
+ getSql("selectTriggersColumnList", "selectTriggersSql"), new TriggerMapper());
}

public boolean isTriggerBeingUsed(String triggerId) {
return sqlTemplate.queryForInt(getSql("countTriggerRoutersByTriggerIdSql"), triggerId) > 0;
}
Expand Down Expand Up @@ -263,7 +263,7 @@ public TriggerHistory getTriggerHistory(int histId) {
}
return history;
}

protected List<TriggerHistory> getActiveTriggerHistories(Trigger trigger) {
List<TriggerHistory> active = getActiveTriggerHistories();
List<TriggerHistory> list = new ArrayList<TriggerHistory>();
Expand All @@ -278,15 +278,20 @@ protected List<TriggerHistory> getActiveTriggerHistories(Trigger trigger) {
public TriggerHistory getNewestTriggerHistoryForTrigger(String triggerId) {
return sqlTemplate.queryForObject(getSql("latestTriggerHistSql"),
new TriggerHistoryMapper(), triggerId);
}
}

public TriggerHistory getNewestTriggerHistoryForTriggerAndTable(String triggerId, String tableName) {
return sqlTemplate.queryForObject(getSql("latestTriggerHistSqlForIdAndName"),
new TriggerHistoryMapper(), triggerId, tableName);
}

/**
* Get a list of trigger histories that are currently active
*/
public List<TriggerHistory> getActiveTriggerHistories() {
return sqlTemplate.query(getSql("allTriggerHistSql", "activeTriggerHistSql"),
new TriggerHistoryMapper());
}
}

public List<TriggerHistory> getActiveTriggerHistories(String tableName) {
return sqlTemplate.query(getSql("allTriggerHistSql", "triggerHistBySourceTableWhereSql"),
Expand Down Expand Up @@ -363,7 +368,7 @@ protected Trigger buildTriggerForSymmetricTable(String tableName) {
} else {
trigger.setChannelId(Constants.CHANNEL_CONFIG);
}

if (!TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE_HOST)
.equals(tableName) &&
!TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE)
Expand Down Expand Up @@ -525,25 +530,25 @@ protected boolean doesTriggerRouterExistInList(List<TriggerRouter> triggerRouter
}
return false;
}

public TriggerRouter getTriggerRouterForCurrentNode(String triggerId, String routerId, boolean refreshCache) {
TriggerRouter triggerRouter = null;
List<TriggerRouter> triggerRouters = getTriggerRoutersForCurrentNode(refreshCache).get(triggerId);
if (triggerRouters != null) {
for (TriggerRouter testTriggerRouter : triggerRouters) {
if (ConfigurationChangedDataRouter.ROUTER_TYPE.equals(testTriggerRouter.getRouter().getRouterType()) ||
if (ConfigurationChangedDataRouter.ROUTER_TYPE.equals(testTriggerRouter.getRouter().getRouterType()) ||
testTriggerRouter.getRouter().getRouterId().equals(routerId)
|| routerId.equals(Constants.UNKNOWN_ROUTER_ID)) {
triggerRouter = testTriggerRouter;
break;
}
}
}

if (triggerRouter == null) {
log.warn("Could not find trigger router [{}:{}] in list {}", new Object[] {triggerId, routerId, triggerRouters == null ? 0 : triggerRouters.toString()});
}

return triggerRouter;
}

Expand Down Expand Up @@ -709,7 +714,7 @@ public TriggerRouter findTriggerRouterById(String triggerId, String routerId) {
return null;
}
}

public List<TriggerRouter> getTriggerRoutersFor(String tableName, String sourceNodeGroupId) {
List<TriggerRouter> results = new ArrayList<TriggerRouter>();
List<TriggerRouter> all = getTriggerRouters();
Expand Down Expand Up @@ -820,7 +825,7 @@ public void saveRouter(Router router) {
router.getRouterId() }, new int[] {
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.SMALLINT,
Types.SMALLINT, Types.VARCHAR, Types.TIMESTAMP,
Types.SMALLINT, Types.VARCHAR, Types.TIMESTAMP,
Types.VARCHAR })) {
router.setCreateTime(router.getLastUpdateTime());
sqlTemplate.update(
Expand All @@ -832,7 +837,7 @@ public void saveRouter(Router router) {
router.getRouterType(), router.getRouterExpression(),
router.isSyncOnUpdate() ? 1 : 0, router.isSyncOnInsert() ? 1 : 0,
router.isSyncOnDelete() ? 1 : 0, router.getCreateTime(),
router.getLastUpdateBy(), router.getLastUpdateTime(), router.getRouterId() },
router.getLastUpdateBy(), router.getLastUpdateTime(), router.getRouterId() },
new int[] {
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT,
Expand Down Expand Up @@ -862,13 +867,13 @@ public void saveTrigger(Trigger trigger) {
trigger.isSyncOnUpdate() ? 1 : 0, trigger.isSyncOnInsert() ? 1 : 0,
trigger.isSyncOnDelete() ? 1 : 0, trigger.isSyncOnIncomingBatch() ? 1 : 0,
trigger.isUseStreamLobs() ? 1 : 0, trigger.isUseCaptureLobs() ? 1 : 0,
trigger.isUseCaptureOldData() ? 1 : 0, trigger.isUseHandleKeyUpdates() ? 1 : 0,
trigger.getNameForUpdateTrigger(), trigger.getNameForInsertTrigger(),
trigger.getNameForDeleteTrigger(), trigger.getSyncOnUpdateCondition(),
trigger.getSyncOnInsertCondition(), trigger.getSyncOnDeleteCondition(),
trigger.getTxIdExpression(), trigger.getExcludedColumnNames(),
trigger.getSyncKeyNames(), trigger.getLastUpdateBy(),
trigger.getLastUpdateTime(), trigger.getExternalSelect(),
trigger.isUseCaptureOldData() ? 1 : 0, trigger.isUseHandleKeyUpdates() ? 1 : 0,
trigger.getNameForUpdateTrigger(), trigger.getNameForInsertTrigger(),
trigger.getNameForDeleteTrigger(), trigger.getSyncOnUpdateCondition(),
trigger.getSyncOnInsertCondition(), trigger.getSyncOnDeleteCondition(),
trigger.getTxIdExpression(), trigger.getExcludedColumnNames(),
trigger.getSyncKeyNames(), trigger.getLastUpdateBy(),
trigger.getLastUpdateTime(), trigger.getExternalSelect(),
trigger.getTriggerId() }, new int[] {
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT,
Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
Expand Down Expand Up @@ -923,7 +928,7 @@ public void syncTriggers(StringBuilder sqlBuffer, boolean force) {
+ ParameterConstants.AUTO_SYNC_TRIGGERS
+ " is set to false, but the sync triggers process will run so that needed changes can be written to a file so they can be applied manually. Once all of the triggers have been successfully applied this process should not show triggers being created";
}

log.info("Synchronizing triggers{}", additionalMessage);

// make sure all tables are freshly read in
Expand Down Expand Up @@ -1010,7 +1015,7 @@ protected void inactivateTriggers(List<Trigger> triggersThatShouldBeActive,
}
}
}

protected void dropTriggers(TriggerHistory history, StringBuilder sqlBuffer) {

if (StringUtils.isNotBlank(history.getNameForInsertTrigger())) {
Expand Down Expand Up @@ -1140,10 +1145,10 @@ public void syncTriggers(Table table, boolean force) {
protected void updateOrCreateDatabaseTriggers(List<Trigger> triggers, StringBuilder sqlBuffer,
boolean force) {
for (Trigger trigger : triggers) {
updateOrCreateDatabaseTrigger(trigger, triggers, sqlBuffer, force);
updateOrCreateDatabaseTrigger(trigger, triggers, sqlBuffer, force);
}
}

protected void updateOrCreateDatabaseTrigger(Trigger trigger, List<Trigger> triggers,
StringBuilder sqlBuffer, boolean force) {
Set<Table> tables = getTablesForTrigger(trigger, triggers);
Expand All @@ -1164,31 +1169,31 @@ protected void updateOrCreateDatabaseTrigger(Trigger trigger, List<Trigger> trig
}
}
}

public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, boolean force) {
StringBuilder sqlBuffer = new StringBuilder();
clearCache();
List<Trigger> triggersForCurrentNode = getTriggersForCurrentNode();
try {
if (listener != null) {
addTriggerCreationListeners(listener);
}
}

if (triggersForCurrentNode.contains(trigger)) {
if (!trigger.isSourceTableNameWildCarded()) {
List<TriggerHistory> histories = getActiveTriggerHistories(trigger);
for (TriggerHistory triggerHistory : histories) {
if (!triggerHistory.getFullyQualifiedSourceTableName().equals(trigger.getFullyQualifiedSourceTableName())) {
dropTriggers(triggerHistory, sqlBuffer);
dropTriggers(triggerHistory, sqlBuffer);
}
}
}
}
updateOrCreateDatabaseTrigger(trigger, triggersForCurrentNode, sqlBuffer,
force);
} else {
List<TriggerHistory> histories = getActiveTriggerHistories(trigger);
for (TriggerHistory triggerHistory : histories) {
dropTriggers(triggerHistory, sqlBuffer);
dropTriggers(triggerHistory, sqlBuffer);
}
}
} finally {
Expand Down Expand Up @@ -1221,8 +1226,8 @@ protected void updateOrCreateDatabaseTriggers(Trigger trigger, Table table,
table.makeAllColumnsPrimaryKeys();
}

TriggerHistory latestHistoryBeforeRebuild = getNewestTriggerHistoryForTrigger(
trigger.getTriggerId());
TriggerHistory latestHistoryBeforeRebuild = getNewestTriggerHistoryForTriggerAndTable(
trigger.getTriggerId(), trigger.getSourceTableName());

boolean forceRebuildOfTriggers = false;
if (latestHistoryBeforeRebuild == null) {
Expand Down Expand Up @@ -1341,7 +1346,7 @@ protected TriggerHistory rebuildTriggerIfNecessary(StringBuilder sqlBuffer,
oldTriggerName = newTriggerHist.getTriggerNameForDmlType(dmlType);
oldSourceSchema = trigger.getSourceSchemaName();
oldCatalogName = trigger.getSourceCatalogName();
if (StringUtils.isNotBlank(oldTriggerName)) {
if (StringUtils.isNotBlank(oldTriggerName)) {
triggerExists = symmetricDialect.doesTriggerExist(oldCatalogName, oldSourceSchema,
trigger.getSourceTableName(), oldTriggerName);
}
Expand Down
Expand Up @@ -11,7 +11,7 @@ public TriggerRouterServiceSqlMap(IDatabasePlatform platform,
super(platform, replacementTokens);

// @formatter:off

putSql("countTriggerRoutersByRouterIdSql",
"select count(*) from $(trigger_router) where router_id=? ");

Expand Down Expand Up @@ -90,6 +90,13 @@ public TriggerRouterServiceSqlMap(IDatabasePlatform platform,
+ " from $(trigger_hist) where trigger_hist_id = (select max(trigger_hist_id) "
+ " from $(trigger_hist) where trigger_id=?) ");

putSql("latestTriggerHistSqlForIdAndName",
""
+ "select "
+ " trigger_hist_id,trigger_id,source_table_name,table_hash,create_time,pk_column_names,column_names,last_trigger_build_reason,name_for_delete_trigger,name_for_insert_trigger,name_for_update_trigger,source_schema_name,source_catalog_name,trigger_row_hash,error_message "
+ " from $(trigger_hist) where trigger_hist_id = (select max(trigger_hist_id) "
+ " from $(trigger_hist) where trigger_id=? and source_table_name=?) ");

putSql("triggerHistSql",
""
+ "select "
Expand Down Expand Up @@ -170,8 +177,8 @@ public TriggerRouterServiceSqlMap(IDatabasePlatform platform,
"from $(router) r where r.source_node_group_id=? and r.target_node_group_id=? order by r.router_id ");

putSql("selectTriggerByIdSql", "" + "where t.trigger_id = ? ");
putSql("selectMaxTriggerLastUpdateTime" ,"select max(last_update_time) from $(trigger) where last_update_time is not null" );

putSql("selectMaxTriggerLastUpdateTime" ,"select max(last_update_time) from $(trigger) where last_update_time is not null" );
putSql("selectMaxRouterLastUpdateTime" ,"select max(last_update_time) from $(router) where last_update_time is not null" );
putSql("selectMaxTriggerRouterLastUpdateTime" ,"select max(last_update_time) from $(trigger_router) where last_update_time is not null" );

Expand Down

0 comments on commit ed23682

Please sign in to comment.