Skip to content

Commit

Permalink
0004565: New Group Link should send node info to nodes in source node
Browse files Browse the repository at this point in the history
group ID and to nodes in target node group ID
  • Loading branch information
Philip Marzullo committed Sep 22, 2020
1 parent e6bf850 commit c1a6e16
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class ChannelRouterContext extends SimpleRouterContext {
private long uncommittedDataEventCount = 0;
private long committedDataEventCount = 0;
private IBatchAlgorithm batchAlgorithm;
private Map<Long, DataMetaData> configDataIdsProcessed = new HashMap<Long, DataMetaData>();

public ChannelRouterContext(String nodeId, NodeChannel channel, ISqlTransaction transaction, IBatchAlgorithm batchAlgorithm)
throws SQLException {
Expand Down Expand Up @@ -134,6 +135,14 @@ public void removeLastData() {
}
}
}

public void addConfigDataMetaData(DataMetaData dataMetaData) {
configDataIdsProcessed.put(dataMetaData.getData().getDataId(), dataMetaData);
}

public DataMetaData getConfigDataMetaData(Long dataId) {
return configDataIdsProcessed.get(dataId);
}

public long getCommittedDataEventCount() {
return this.committedDataEventCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static org.apache.commons.lang.StringUtils.isNotBlank;

import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
Expand All @@ -30,15 +31,18 @@

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.job.IJobManager;
import org.jumpmind.symmetric.load.ConfigurationChangedDatabaseWriterFilter;
import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.NetworkedNode;
import org.jumpmind.symmetric.model.Node;
Expand All @@ -47,6 +51,7 @@
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.ITriggerRouterService;

public class ConfigurationChangedDataRouter extends AbstractDataRouter implements IDataRouter, IBuiltInExtensionPoint {
Expand Down Expand Up @@ -229,6 +234,14 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData

if (tableMatches(dataMetaData, TableConstants.SYM_NODE_GROUP_LINK)) {
routingContext.put(CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED, Boolean.TRUE);
if(dataMetaData.getData().getDataEventType().equals(DataEventType.INSERT)) {
if(! initialLoad) {
if (! isConfigDataMetaDataAlreadyHandled(dataMetaData, routingContext)) {
buildReloadEvents(dataMetaData, columnValues);
addConfigDataMetaData(dataMetaData, routingContext);
}
}
}
}

if (tableMatches(dataMetaData, TableConstants.SYM_JOB)
Expand All @@ -241,6 +254,114 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
return nodeIds;
}

private boolean isConfigDataMetaDataAlreadyHandled(DataMetaData dataMetaData, SimpleRouterContext routingContext) {
boolean ret = false;
if(routingContext instanceof ChannelRouterContext) {
ChannelRouterContext channelRoutingContext = (ChannelRouterContext) routingContext;
if (channelRoutingContext.getConfigDataMetaData(dataMetaData.getData().getDataId()) != null) {
ret = true;
}
}
return ret;
}

private void addConfigDataMetaData(DataMetaData dataMetaData, SimpleRouterContext routingContext) {
if(routingContext instanceof ChannelRouterContext) {
ChannelRouterContext channelRoutingContext = (ChannelRouterContext) routingContext;
channelRoutingContext.addConfigDataMetaData(dataMetaData);
}
}

private void buildReloadEvents(DataMetaData dataMetaData, Map<String, String> columnValues) {
String symTablePrefix = engine.getTablePrefix();
String tableName = dataMetaData.getTable().getName();
if (TableConstants.getTableName(symTablePrefix, TableConstants.SYM_NODE_GROUP_LINK).equalsIgnoreCase(tableName))
{
if (engine.getParameterService().isRegistrationServer()) {
if(dataMetaData.getData().getDataEventType().equals(DataEventType.INSERT)) {
Node me = engine.getNodeService().findIdentity();
String targetNodeGroupId = columnValues.get("TARGET_NODE_GROUP_ID");
String sourceNodeGroupId = columnValues.get("SOURCE_NODE_GROUP_ID");
log.info("Inserting reload events for sym_node and sym_node_security for source_node_group_id=" + sourceNodeGroupId + " and target_node_group_id=" + targetNodeGroupId);
Collection<Node> targetNodes = engine.getNodeService().findEnabledNodesFromNodeGroup(targetNodeGroupId);
Collection<Node> sourceNodes = engine.getNodeService().findEnabledNodesFromNodeGroup(sourceNodeGroupId);
NodeGroupLink nodeGroupLink = new NodeGroupLink(sourceNodeGroupId, targetNodeGroupId);
Date createTime = new Date();
List<TriggerRouter> triggerRouterList =
engine.getTriggerRouterService().buildTriggerRoutersForSymmetricTables(
Version.version(), nodeGroupLink);
// send sym_node
TriggerRouter triggerRouter = findTriggerRouter(triggerRouterList, TableConstants.SYM_NODE, symTablePrefix);
if(triggerRouter != null) {
// send nodes in sourcenodegroupid to target nodes
String initialLoadSelect =
String.format(engine.getDataService().findNodeIdsByNodeGroupId(), "'" + sourceNodeGroupId + "'");
insertReloadEvents(triggerRouter, initialLoadSelect, sourceNodeGroupId, targetNodeGroupId,
createTime, me, targetNodes);
// send nodes in targetnodegroupid to source nodes
initialLoadSelect =
String.format(engine.getDataService().findNodeIdsByNodeGroupId(), "'" + targetNodeGroupId + "'");
insertReloadEvents(triggerRouter, initialLoadSelect, sourceNodeGroupId, targetNodeGroupId,
createTime, me, sourceNodes);
}
// send sym_node_security
triggerRouter = findTriggerRouter(triggerRouterList, TableConstants.SYM_NODE_SECURITY, symTablePrefix);
if(triggerRouter != null) {
// send source nodes in sourcenodegroupid to target nodes
String initialLoadSelect =
String.format(engine.getDataService().findNodeIdsByNodeGroupId(), "'" + sourceNodeGroupId + "'");
insertReloadEvents(triggerRouter, initialLoadSelect, sourceNodeGroupId, targetNodeGroupId,
createTime, me, targetNodes);
// send target nodes in targetnodegroupid to source nodes
initialLoadSelect =
String.format(engine.getDataService().findNodeIdsByNodeGroupId(), "'" + targetNodeGroupId + "'");
insertReloadEvents(triggerRouter, initialLoadSelect, sourceNodeGroupId, targetNodeGroupId,
createTime, me, sourceNodes);
}
}
}
}
}

private void insertReloadEvents(
TriggerRouter triggerRouter, String initialLoadSelect, String sourceNodeGroupId,
String targetNodeGroupId, Date createTime, Node me, Collection<Node> targetNodes)
{
IDataService dataService = engine.getDataService();
ITriggerRouterService triggerRouterService = engine.getTriggerRouterService();
List<TriggerHistory> triggerHistories =
triggerRouterService.getActiveTriggerHistories(triggerRouter.getTrigger());
TriggerHistory triggerHistory = triggerHistories.get(0);
ISqlTransaction transaction = null;
try {
transaction = engine.getDatabasePlatform().getSqlTemplate().startSqlTransaction();
for(Node targetNode : targetNodes) {
if(me.getNodeId().equalsIgnoreCase(targetNode.getNodeId())) {
continue;
}
dataService.insertReloadEvent(transaction, targetNode,
triggerRouter, triggerHistory,
initialLoadSelect, false, -1l, "configRouter",
Status.NE, 0l);
}
transaction.commit();
} catch(Exception e) {
log.error("Failed to insert reload events for table " + triggerRouter.getTrigger().getSourceTableName(), e);
if(transaction != null) {
transaction.rollback();
}
}
}

private TriggerRouter findTriggerRouter(List<TriggerRouter> triggerRouters, String tableName, String symTablePrefix) {
for(TriggerRouter triggerRouter : triggerRouters) {
if (TableConstants.getTableName(symTablePrefix, tableName).equalsIgnoreCase(triggerRouter.getTrigger().getSourceTableName())) {
return triggerRouter;
}
}
return null;
}

protected Set<Node> filterOutNodesByDeploymentType(DataMetaData dataMetaData, Set<Node> possibleTargetNodes) {
if (tableMatches(dataMetaData, TableConstants.SYM_CONSOLE_USER)
|| tableMatches(dataMetaData, TableConstants.SYM_CONSOLE_USER_HIST)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,6 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId,
public ISqlReadCursor<Data> selectDataFor(Long batchId, String targetNodeId, boolean isContainsBigLob);

public Map<String, Date> getLastDataCaptureByChannel();

public String findNodeIdsByNodeGroupId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3061,6 +3061,11 @@ public boolean fixLastDataGap() {
}
return fixed;
}

@Override
public String findNodeIdsByNodeGroupId() {
return getSql("findNodeIdsByNodeGroupIdSql");
}

protected void checkInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace

putSql("findLastCaptureTimeByChannelSql",
"select max(create_time) as create_time, channel_id from $(data) group by channel_id ");

// Used by ConfigurationChangedDataRouter for table reload request of sym_node_security
// ConfigurationChangedDataRouter appends node_group_id value
putSql("findNodeIdsByNodeGroupIdSql", " node_id in (select node_id from $(node) where node_group_id = %s)");

}

Expand Down

0 comments on commit c1a6e16

Please sign in to comment.