Skip to content

Commit

Permalink
0002341: Add the ability to use parameter values in catalog and schem…
Browse files Browse the repository at this point in the history
…a for trigger, router, and transform configuration.
  • Loading branch information
chenson42 committed Jul 3, 2015
1 parent e4c99ce commit 0a04d4a
Show file tree
Hide file tree
Showing 17 changed files with 591 additions and 698 deletions.
Expand Up @@ -147,7 +147,7 @@ public static File createSnapshot(ISymmetricEngine engine) {
}
}

List<Trigger> triggers = triggerRouterService.getTriggers();
List<Trigger> triggers = triggerRouterService.getTriggers(true);
for (Trigger trigger : triggers) {
Table table = engine.getDatabasePlatform().getTableFromCache(trigger.getSourceCatalogName(),
trigger.getSourceSchemaName(), trigger.getSourceTableName(),
Expand Down
Expand Up @@ -608,7 +608,7 @@ public synchronized void uninstall() {
groupletService.deleteGrouplet(grouplet);
}

List<TriggerRouter> triggerRouters = triggerRouterService.getTriggerRouters(true);
List<TriggerRouter> triggerRouters = triggerRouterService.getTriggerRouters(false, true);
for (TriggerRouter triggerRouter : triggerRouters) {
triggerRouterService.deleteTriggerRouter(triggerRouter);
}
Expand All @@ -618,7 +618,7 @@ public synchronized void uninstall() {
fileSyncService.deleteFileTriggerRouter(fileTriggerRouter);
}

List<Router> routers = triggerRouterService.getRouters();
List<Router> routers = triggerRouterService.getRouters(true);
for (Router router : routers) {
triggerRouterService.deleteRouter(router);
}
Expand Down Expand Up @@ -652,7 +652,7 @@ public synchronized void uninstall() {
table = platform.readTableFromDatabase(null, null, TableConstants.getTableName(
parameterService.getTablePrefix(), TableConstants.SYM_ROUTER));
if (table != null) {
List<Router> objects = triggerRouterService.getRouters();
List<Router> objects = triggerRouterService.getRouters(true);
for (Router router : objects) {
triggerRouterService.deleteRouter(router);
}
Expand Down
Expand Up @@ -330,14 +330,14 @@ protected void queueSyncTriggers(SimpleRouterContext routingContext, DataMetaDat
String triggerId = columnValues.get("TRIGGER_ID");
if (tableMatches(dataMetaData, TableConstants.SYM_TRIGGER_ROUTER)) {
String routerId = columnValues.get("ROUTER_ID");
TriggerRouter tr = triggerRouterService.findTriggerRouterById(triggerId,
TriggerRouter tr = triggerRouterService.findTriggerRouterById(true, triggerId,
routerId);
if (tr != null) {
trigger = tr.getTrigger();
lastUpdateTime = tr.getLastUpdateTime();
}
} else {
trigger = triggerRouterService.getTriggerById(triggerId);
trigger = triggerRouterService.getTriggerById(true, triggerId);
if (trigger != null) {
lastUpdateTime = trigger.getLastUpdateTime();
}
Expand Down Expand Up @@ -493,7 +493,7 @@ public void contextCommitted(SimpleRouterContext routingContext) {
log.info("About to sync the "
+ trigger.getTriggerId()
+ " trigger because a change was detected by the config data router");
engine.getTriggerRouterService().syncTrigger(trigger, null, false);
engine.getTriggerRouterService().syncTrigger(trigger.getTriggerId(), null, false);
}
}
}
Expand Down
Expand Up @@ -48,7 +48,7 @@ public interface ITriggerRouterService {

public List<TriggerHistory> getActiveTriggerHistories(String tableName);

public List<TriggerRouter> getTriggerRouters(boolean refreshCache);
public List<TriggerRouter> getTriggerRouters(boolean substituteParameters, boolean refreshCache);

/**
* Return a list of triggers used when extraction configuration data during
Expand All @@ -59,21 +59,17 @@ public interface ITriggerRouterService {
public List<TriggerRouter> buildTriggerRoutersForSymmetricTables(String version, NodeGroupLink nodeGroupLink, String... tablesToExclude);

public String buildSymmetricTableRouterId(String triggerId, String sourceNodeGroupId, String targetNodeGroupId);

public Trigger getTriggerForCurrentNodeById(String triggerId);


public TriggerRouter getTriggerRouterForCurrentNode(String triggerId, String routerId, boolean refreshCache);

/**
* Returns a list of triggers that should be active for the current node.
* Returns a list of triggers that should be active for the current node.
* @param refreshCache Indicates that the cache should be refreshed
*/
public List<Trigger> getTriggersForCurrentNode(boolean refreshCache);

public Map<String, List<TriggerRouter>> getTriggerRoutersByChannel(String nodeGroupId);


/**
* Returns a map of trigger routers keyed by trigger id.
* Returns a map of trigger routers keyed by trigger id.
* @param refreshCache Indicates that the cache should be refreshed
*/
public Map<String, List<TriggerRouter>> getTriggerRoutersForCurrentNode(boolean refreshCache);
Expand All @@ -85,11 +81,11 @@ public interface ITriggerRouterService {
*/
public Router getActiveRouterByIdForCurrentNode(String routerId, boolean refreshCache);

public Router getRouterById(String routerId);
public Router getRouterById(boolean substituteParameters, String routerId);

public Router getRouterById(String routerId, boolean refreshCache);
public Router getRouterById(boolean substituteParameters, String routerId, boolean refreshCache);

public List<Router> getRouters();
public List<Router> getRouters(boolean substituteParameters);

/**
* Get a list of routers for a specific node group link.
Expand All @@ -102,12 +98,10 @@ public interface ITriggerRouterService {

public void saveRouter(Router router);

public List<TriggerRouter> getAllTriggerRoutersForCurrentNode(String sourceNodeGroupId);

/**
* Get a list of all the triggers that have been defined for the system.
*/
public List<Trigger> getTriggers();
public List<Trigger> getTriggers(boolean substituteParameters);

public void saveTrigger(Trigger trigger);

Expand All @@ -130,11 +124,11 @@ public List<Trigger> createTriggersOnChannelForTablesWithReturn(String channelId

public List<TriggerRouter> getAllTriggerRoutersForReloadForCurrentNode(String sourceNodeGroupId, String targetNodeGroupId);

public Set<TriggerRouter> getTriggerRouterForTableForCurrentNode(NodeGroupLink link, String catalogName, String schemaName, String tableName, boolean refreshCache);
public Set<TriggerRouter> getTriggerRouterForTableForCurrentNode(boolean substituteParameters, NodeGroupLink link, String catalogName, String schemaName, String tableName, boolean refreshCache);

public Set<TriggerRouter> getTriggerRouterForTableForCurrentNode(String catalog, String schema, String tableName, boolean refreshCache);
public Set<TriggerRouter> getTriggerRouterForTableForCurrentNode(boolean substituteParameters, String catalog, String schema, String tableName, boolean refreshCache);

public TriggerRouter findTriggerRouterById(String triggerId, String routerId);
public TriggerRouter findTriggerRouterById(boolean substituteParameters, String triggerId, String routerId);

public void inactivateTriggerHistory(TriggerHistory history);

Expand All @@ -147,9 +141,9 @@ public TriggerHistory getNewestTriggerHistoryForTrigger(String triggerId, String

public TriggerHistory findTriggerHistory(String catalogName, String schemaName, String tableName);

public Trigger getTriggerById(String triggerId);
public Trigger getTriggerById(boolean substituteParameters, String triggerId);

public Trigger getTriggerById(String triggerId, boolean refreshCache);
public Trigger getTriggerById(boolean substituteParameters, String triggerId, boolean refreshCache);

public void insert(TriggerHistory newAuditRecord);

Expand All @@ -163,9 +157,9 @@ public TriggerHistory getNewestTriggerHistoryForTrigger(String triggerId, String

public void saveTriggerRouter(TriggerRouter triggerRouter);

public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, boolean force);
public void syncTrigger(String triggerId, ITriggerCreationListener listener, boolean force);

public void syncTrigger(Trigger trigger, ITriggerCreationListener listener, boolean force, boolean verifyTrigger);
public void syncTrigger(String triggerId, ITriggerCreationListener listener, boolean force, boolean verifyTrigger);

public void syncTriggers(Table table, boolean genAlways);

Expand Down
Expand Up @@ -955,7 +955,7 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId,
table.setPrimaryKeys(triggerHistory.getParsedPkColumnNames());
}

Router router = triggerRouterService.getRouterById(routerId, false);
Router router = triggerRouterService.getRouterById(true, routerId, false);
if (router != null && setTargetTableName) {
if (router.isUseSourceCatalogSchema()) {
table.setCatalog(catalogName);
Expand Down Expand Up @@ -1175,7 +1175,7 @@ public ExtractRequest mapRow(Row row) {
request.setStatus(ExtractStatus.valueOf(row.getString("status").toUpperCase()));
request.setCreateTime(row.getDateTime("create_time"));
request.setLastUpdateTime(row.getDateTime("last_update_time"));
request.setTriggerRouter(triggerRouterService.findTriggerRouterById(
request.setTriggerRouter(triggerRouterService.findTriggerRouterById(true,
row.getString("trigger_id"), row.getString("router_id")));
return request;
}
Expand Down Expand Up @@ -1411,7 +1411,7 @@ public CsvData next() {
return next();
}
} else {
Trigger trigger = triggerRouterService.getTriggerById(
Trigger trigger = triggerRouterService.getTriggerById(true,
triggerHistory.getTriggerId(), false);
if (trigger != null) {
if (lastTriggerHistory == null || lastTriggerHistory
Expand Down
Expand Up @@ -111,8 +111,8 @@ public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtCli
INodeService nodeService = engine.getNodeService();
Node targetNode = nodeService.findNode(request.getTargetNodeId());
if (targetNode != null) {
TriggerRouter triggerRouter = triggerRouterService.getTriggerRouterForCurrentNode(
request.getTriggerId(), request.getRouterId(), false);
TriggerRouter triggerRouter = triggerRouterService.getTriggerRouterForCurrentNode(request.getTriggerId(),
request.getRouterId(), false);
if (triggerRouter != null) {
Trigger trigger = triggerRouter.getTrigger();
Router router = triggerRouter.getRouter();
Expand Down Expand Up @@ -678,8 +678,8 @@ private void insertFileSyncBatchForReload(Node targetNode, long loadId, String c
fileSyncSnapshotHistory.getTriggerId(), parameterService.getNodeGroupId(),
targetNode.getNodeGroupId());
TriggerRouter fileSyncSnapshotTriggerRouter = triggerRouterService
.getTriggerRouterForCurrentNode(fileSyncSnapshotHistory.getTriggerId(),
routerid, true);
.getTriggerRouterForCurrentNode(fileSyncSnapshotHistory.getTriggerId(), routerid,
true);

List<Channel> channels = engine.getConfigurationService().getFileSyncChannels();
for (Channel channel : channels) {
Expand Down Expand Up @@ -748,7 +748,7 @@ public void insertSqlEvent(Node targetNode, String sql, boolean isLoad, long loa
String createBy) {
TriggerHistory history = engine.getTriggerRouterService()
.findTriggerHistoryForGenericSync();
Trigger trigger = engine.getTriggerRouterService().getTriggerById(history.getTriggerId(),
Trigger trigger = engine.getTriggerRouterService().getTriggerById(true, history.getTriggerId(),
false);
String reloadChannelId = getReloadChannelIdForTrigger(trigger, engine
.getConfigurationService().getChannels(false));
Expand Down Expand Up @@ -776,7 +776,7 @@ public void insertSqlEvent(ISqlTransaction transaction, Node targetNode, String
protected void insertSqlEvent(ISqlTransaction transaction, TriggerHistory history,
String channelId, Node targetNode, String sql, boolean isLoad, long loadId,
String createBy) {
Trigger trigger = engine.getTriggerRouterService().getTriggerById(history.getTriggerId(),
Trigger trigger = engine.getTriggerRouterService().getTriggerById(true, history.getTriggerId(),
false);
String reloadChannelId = getReloadChannelIdForTrigger(trigger, engine
.getConfigurationService().getChannels(false));
Expand All @@ -796,7 +796,7 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId,
Node targetNode, String script, boolean isLoad, long loadId, String createBy) {
TriggerHistory history = engine.getTriggerRouterService()
.findTriggerHistoryForGenericSync();
Trigger trigger = engine.getTriggerRouterService().getTriggerById(history.getTriggerId(),
Trigger trigger = engine.getTriggerRouterService().getTriggerById(true, history.getTriggerId(),
false);
String reloadChannelId = getReloadChannelIdForTrigger(trigger, engine
.getConfigurationService().getChannels(false));
Expand Down Expand Up @@ -853,7 +853,7 @@ public void insertCreateEvent(final Node targetNode, TriggerHistory triggerHisto
public void insertCreateEvent(ISqlTransaction transaction, Node targetNode,
TriggerHistory triggerHistory, String routerId, boolean isLoad, long loadId, String createBy) {

Trigger trigger = engine.getTriggerRouterService().getTriggerById(
Trigger trigger = engine.getTriggerRouterService().getTriggerById(true,
triggerHistory.getTriggerId(), false);
String reloadChannelId = getReloadChannelIdForTrigger(trigger, engine
.getConfigurationService().getChannels(false));
Expand Down Expand Up @@ -1027,7 +1027,7 @@ public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction,
if (isLoad) {
TriggerHistory history = data.getTriggerHistory();
if (history != null && channelId == null) {
Trigger trigger = engine.getTriggerRouterService().getTriggerById(
Trigger trigger = engine.getTriggerRouterService().getTriggerById(true,
history.getTriggerId());
channelId = getReloadChannelIdForTrigger(trigger, engine.getConfigurationService()
.getChannels(false));
Expand Down Expand Up @@ -1148,7 +1148,7 @@ public String sendSQL(String nodeId, String catalogName, String schemaName, Stri
return "Trigger for table " + tableName + " does not exist from node "
+ sourceNode.getNodeGroupId();
} else {
Trigger trigger = triggerRouterService.getTriggerById(triggerHistory.getTriggerId());
Trigger trigger = triggerRouterService.getTriggerById(true, triggerHistory.getTriggerId());
if (trigger != null) {
ISqlTransaction transaction = null;
try {
Expand Down Expand Up @@ -1256,7 +1256,7 @@ public void insertHeartbeatEvent(Node node, boolean isReload) {
for (NodeGroupLink nodeGroupLink : links) {
if (nodeGroupLink.getDataEventAction() == NodeGroupLinkAction.P) {
Set<TriggerRouter> triggerRouters = engine.getTriggerRouterService()
.getTriggerRouterForTableForCurrentNode(nodeGroupLink, null, null,
.getTriggerRouterForTableForCurrentNode(true, nodeGroupLink, null, null,
tableName, false);
if (triggerRouters != null && triggerRouters.size() > 0) {
Data data = createData(transaction, triggerRouters.iterator().next()
Expand Down Expand Up @@ -1323,7 +1323,7 @@ public Data createData(ISqlTransaction transaction, String catalogName, String s
String tableName, String whereClause) {
Data data = null;
Set<TriggerRouter> triggerRouters = engine.getTriggerRouterService()
.getTriggerRouterForTableForCurrentNode(catalogName, schemaName, tableName, false);
.getTriggerRouterForTableForCurrentNode(true, catalogName, schemaName, tableName, false);
if (triggerRouters != null && triggerRouters.size() > 0) {
data = createData(transaction, triggerRouters.iterator().next().getTrigger(),
whereClause);
Expand Down
Expand Up @@ -903,7 +903,7 @@ public FileTriggerRouter mapRow(Row rs) {
fileTriggerRouter.setTargetBaseDir((rs.getString("target_base_dir") == null) ? null
: rs.getString("target_base_dir").replace('\\', '/'));
fileTriggerRouter.setRouter(engine.getTriggerRouterService().getRouterById(
rs.getString("router_id")));
true, rs.getString("router_id")));
return fileTriggerRouter;
}
}
Expand Down
Expand Up @@ -459,7 +459,7 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod
long ts = System.currentTimeMillis();
int dataCount = -1;
try {
List<TriggerRouter> triggerRouters = engine.getTriggerRouterService().getTriggerRouters(false);
List<TriggerRouter> triggerRouters = engine.getTriggerRouterService().getTriggerRouters(true, false);
boolean producesCommonBatches = producesCommonBatches(nodeChannel.getChannel(), parameterService.getNodeGroupId(),
triggerRouters);
boolean onlyDefaultRoutersAssigned = onlyDefaultRoutersAssigned(nodeChannel.getChannel(),
Expand Down

0 comments on commit 0a04d4a

Please sign in to comment.