Skip to content
Permalink
Browse files

0003881: Allow manual routing (pre-routing) of batches

  • Loading branch information...
erilong committed Feb 28, 2019
1 parent 1c7aff2 commit 2a7d2bdc8caf4d2aff60cee196b0acb3bc112ab1
@@ -39,6 +39,8 @@
*/
private TriggerHistory triggerHistory;

private boolean isPreRouted;

public Data(long dataId, String pkData, String rowData, DataEventType eventType,
String tableName, Date createTime, TriggerHistory triggerHistory, String channelId,
String transactionId, String sourceNodeId) {
@@ -170,6 +172,14 @@ public String getNodeList() {
return getAttribute(ATTRIBUTE_NODE_LIST);
}

public boolean isPreRouted() {
return isPreRouted;
}

public void setPreRouted(boolean isPreRouted) {
this.isPreRouted = isPreRouted;
}

public Date getCreateTime() {
return getAttribute(ATTRIBUTE_CREATE_TIME);
}
@@ -101,6 +101,13 @@ public void addDataEvent(long dataId, long batchId, String routerId) {
}
}

public void addData(long dataId) {
if (dataId != lastDataId) {
dataIds.add(dataId);
lastDataId = dataId;
}
}

public Map<String, OutgoingBatch> getBatchesByNodes() {
return batchesByNodes;
}
@@ -90,6 +90,8 @@
* @return message string indicating success or error
*/
public String sendSQL(String nodeId, String catalogName, String schemaName, String tableName, String sql);

public String sendSQL(String nodeId, String sql);

public void insertReloadEvents(Node targetNode, boolean reverse, ProcessInfo processInfo, List<TriggerHistory> activeHistories, List<TriggerRouter> triggerRouters);

@@ -2199,11 +2199,10 @@ protected void restartExtractRequest(List<OutgoingBatch> batches, ExtractRequest

// clear the incoming batch table for the batches at the target node, so the batches won't be skipped
for (ExtractRequest extractRequest : allRequests) {
String symNode = TableConstants.getTableName(parameterService.getTablePrefix(), TableConstants.SYM_NODE);
String symIncomingBatch = TableConstants.getTableName(parameterService.getTablePrefix(), TableConstants.SYM_INCOMING_BATCH);
String sql = "delete from " + symIncomingBatch + " where node_id = '" + nodeService.findIdentityNodeId() +
"' and batch_id between " + extractRequest.getStartBatchId() + " and " + extractRequest.getEndBatchId();
dataService.sendSQL(extractRequest.getNodeId(), null, null, symNode, sql);
dataService.sendSQL(extractRequest.getNodeId(), sql);
}
}

@@ -1885,9 +1885,9 @@ protected long insertData(ISqlTransaction transaction, final Data data) {
data.getOldData(),
data.getTriggerHistory() != null ? data.getTriggerHistory()
.getTriggerHistoryId() : -1, data.getChannelId(),
data.getExternalData(), data.getNodeList() }, new int[] { Types.VARCHAR,
data.getExternalData(), data.getNodeList(), data.isPreRouted() ? 1 : 0 }, new int[] { Types.VARCHAR,
Types.CHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.NUMERIC,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR });
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.NUMERIC });
data.setDataId(id);
return id;
}
@@ -1937,6 +1937,7 @@ public void insertDataAndDataEventAndOutgoingBatch(Data data, String channelId,
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
data.setPreRouted(true);
long dataId = insertData(transaction, data);
for (Node node : nodes) {
insertDataEventAndOutgoingBatch(transaction, dataId, channelId, node.getNodeId(),
@@ -1992,6 +1993,7 @@ public long insertDataAndDataEventAndOutgoingBatch(Data data, String nodeId, Str
*/
public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data, String nodeId, String routerId, boolean isLoad,
long loadId, String createBy, Status status, String overrideChannelId, long estimatedBatchRowCount) {
data.setPreRouted(true);
long dataId = insertData(transaction, data);
String channelId = null;
if (isLoad) {
@@ -2159,6 +2161,54 @@ public String sendSQL(String nodeId, String catalogName, String schemaName, Stri
}
}

public String sendSQL(String nodeId, String sql) {
String tableName = TableConstants.getTableName(parameterService.getTablePrefix(), TableConstants.SYM_NODE_HOST);
Node sourceNode = engine.getNodeService().findIdentity();
Node targetNode = engine.getNodeService().findNode(nodeId, true);
if (targetNode == null) {
return "Unknown node " + nodeId;
}

ITriggerRouterService triggerRouterService = engine.getTriggerRouterService();
TriggerHistory triggerHistory = triggerRouterService.findTriggerHistory(null, null, tableName);

if (triggerHistory == null) {
return "Trigger for table " + tableName + " does not exist from node "
+ sourceNode.getNodeGroupId();
} else {
Trigger trigger = triggerRouterService.getTriggerById(triggerHistory.getTriggerId());
if (trigger != null) {
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
Data data = new Data(triggerHistory.getSourceTableName(), DataEventType.SQL,
CsvUtils.escapeCsvData(sql), null, triggerHistory, Constants.CHANNEL_CONFIG,
null, null);
data.setNodeList(targetNode.getNodeId());
insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(),
Constants.UNKNOWN_ROUTER_ID, false, -1, null, Status.NE, null, -1);
transaction.commit();
return "Successfully create SQL event for node " + targetNode.getNodeId();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
close(transaction);
}
} else {
return "Trigger for table " + tableName + " does not exist from node "
+ sourceNode.getNodeGroupId();
}
}
}

public String reloadTable(String nodeId, String catalogName, String schemaName, String tableName) {
return reloadTable(nodeId, catalogName, schemaName, tableName, null);
}
@@ -2885,6 +2935,7 @@ public Data mapRow(Row row) {
}
}
data.setTriggerHistory(triggerHistory);
data.setPreRouted(row.getBoolean("IS_PREROUTED"));
return data;
}
}
@@ -146,14 +146,14 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
putSql("selectEventDataToExtractSql",
""
+ "select d.data_id, d.table_name, d.event_type, d.row_data as row_data, d.pk_data as pk_data, d.old_data as old_data, "
+ " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, e.router_id from $(data) d inner join "
+ " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, d.is_prerouted, e.router_id from $(data) d inner join "
+ " $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id "
+ " where o.batch_id = ? and o.node_id = ? ");

putSql("selectEventDataByBatchIdSql",
""
+ "select d.data_id, d.table_name, d.event_type, d.row_data as row_data, d.pk_data as pk_data, d.old_data as old_data, "
+ " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, e.router_id from $(data) d inner join "
+ " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, d.is_prerouted, e.router_id from $(data) d inner join "
+ " $(data_event) e on d.data_id = e.data_id inner join $(outgoing_batch) o on o.batch_id=e.batch_id "
+ " where o.batch_id = ? ");

@@ -165,7 +165,7 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace

putSql("selectData",
"select data_id, table_name, event_type, row_data, pk_data, old_data, " +
"create_time, trigger_hist_id, channel_id, transaction_id, source_node_id, external_data, node_list, '' as router_id " +
"create_time, trigger_hist_id, channel_id, transaction_id, source_node_id, external_data, node_list, '' as router_id, is_prerouted " +
"from $(data) where data_id = ?");

putSql("selectMaxDataEventDataIdSql", ""
@@ -175,9 +175,8 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ "select count(*) from $(data) where data_id > ? and data_id < ? ");

putSql("insertIntoDataSql",
""
+ "insert into $(data) (data_id, table_name, event_type, row_data, pk_data, "
+ " old_data, trigger_hist_id, channel_id, external_data, node_list, create_time) values(null, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp) ");
"insert into $(data) (data_id, table_name, event_type, row_data, pk_data, " +
"old_data, trigger_hist_id, channel_id, external_data, node_list, is_prerouted, create_time) values(null, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp)");

putSql("insertIntoDataEventSql",
""
@@ -268,11 +268,8 @@ protected void insertInitialLoadEvents() {
Map<String, List<TriggerRouter>> triggerRoutersByTargetNodeGroupId = new HashMap<String, List<TriggerRouter>>();

if (nodeSecurities != null && nodeSecurities.size() > 0) {
gapDetector.setFullGapAnalysis(true);
boolean reverseLoadFirst = parameterService
.is(ParameterConstants.INITIAL_LOAD_REVERSE_FIRST);
boolean isInitialLoadQueued = false;


for (NodeSecurity security : nodeSecurities) {
Node targetNode = engine.getNodeService().findNode(security.getNodeId());
@@ -298,7 +295,6 @@ protected void insertInitialLoadEvents() {
dataService.insertReloadEvents(
targetNode,
false, processInfo, activeHistories, triggerRouters);
isInitialLoadQueued = true;
ts = System.currentTimeMillis() - ts;
if (ts > Constants.LONG_OPERATION_THRESHOLD) {
log.warn("Inserted reload events for node {} took longer than expected. It took {} ms",
@@ -327,9 +323,6 @@ protected void insertInitialLoadEvents() {
}
}
}
if (isInitialLoadQueued) {
gapDetector.setFullGapAnalysis(true);
}
}

processTableRequestLoads(identity, processInfo, triggerRoutersByTargetNodeGroupId);
@@ -349,14 +342,12 @@ public void processTableRequestLoads(Node source, ProcessInfo processInfo, Map<
if (loadsToProcess.size() > 0) {
processInfo.setStatus(ProcessInfo.ProcessStatus.CREATING);
log.info("Found " + loadsToProcess.size() + " table reload requests to process.");
gapDetector.setFullGapAnalysis(true);

boolean useExtractJob = parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB, true);
boolean streamToFile = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED, false);

Map<String, List<TableReloadRequest>> requestsSplitByLoad = new HashMap<String, List<TableReloadRequest>>();
Map<String, ExtractRequest> extractRequests = null;
int extractRequestCount = 0;

for (TableReloadRequest load : loadsToProcess) {
Node targetNode = engine.getNodeService().findNode(load.getTargetNodeId(), true);
@@ -372,7 +363,6 @@ public void processTableRequestLoads(Node source, ProcessInfo processInfo, Map<
.getActiveTriggerHistories(targetNode);

extractRequests = engine.getDataService().insertReloadEvents(targetNode, false, fullLoad, processInfo, activeHistories, triggerRouters, extractRequests);
extractRequestCount += extractRequests == null ? 0 : extractRequests.size();
} else {
NodeSecurity targetNodeSecurity = engine.getNodeService().findNodeSecurity(load.getTargetNodeId());

@@ -408,11 +398,6 @@ public void processTableRequestLoads(Node source, ProcessInfo processInfo, Map<
List<TriggerHistory> activeHistories = extensionService.getExtensionPoint(IReloadGenerator.class).getActiveTriggerHistories(targetNode);

extractRequests = engine.getDataService().insertReloadEvents(targetNode, false, entry.getValue(), processInfo, activeHistories, triggerRouters, extractRequests);
extractRequestCount += extractRequests == null ? 0 : extractRequests.size();
}

if (extractRequestCount == 0) {
gapDetector.setFullGapAnalysis(false);
}
}
}
@@ -928,18 +913,22 @@ protected int selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext c
if (data != null) {
processInfo.setCurrentTableName(data.getTableName());
processInfo.incrementCurrentDataCount();
boolean atTransactionBoundary = false;
if (nextData != null) {
String nextTxId = nextData.getTransactionId();
atTransactionBoundary = nextTxId == null
|| !nextTxId.equals(data.getTransactionId());
if (data.isPreRouted()) {
context.addData(data.getDataId());
} else {
boolean atTransactionBoundary = false;
if (nextData != null) {
String nextTxId = nextData.getTransactionId();
atTransactionBoundary = nextTxId == null
|| !nextTxId.equals(data.getTransactionId());
}
context.setEncountedTransactionBoundary(atTransactionBoundary);
statsDataCount++;
totalDataCount++;
int dataEventsInserted = routeData(processInfo, data, context);
statsDataEventCount += dataEventsInserted;
totalDataEventCount += dataEventsInserted;
}
context.setEncountedTransactionBoundary(atTransactionBoundary);
statsDataCount++;
totalDataCount++;
int dataEventsInserted = routeData(processInfo, data, context);
statsDataEventCount += dataEventsInserted;
totalDataEventCount += dataEventsInserted;
long insertTs = System.currentTimeMillis();
try {
if (maxNumberOfEventsBeforeFlush <= context.getDataEventList().size()
@@ -35,12 +35,12 @@ public RouterServiceSqlMap(IDatabasePlatform platform, Map<String, String> repla

putSql("selectDataUsingGapsSql",
"select $(selectDataUsingGapsSqlHint) d.data_id, d.table_name, d.event_type, d.row_data as row_data, d.pk_data as pk_data, d.old_data as old_data, "
+ " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list "
+ " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, d.is_prerouted "
+ " from $(data) d where d.channel_id=? $(dataRange) ");

putSql("selectDataUsingStartDataId",
"select d.data_id, d.table_name, d.event_type, d.row_data as row_data, d.pk_data as pk_data, d.old_data as old_data, "
+ " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list "
+ " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list, d.is_prerouted "
+ " from $(data) d where d.channel_id=? and data_id >= ? ");

putSql("orderByDataId", " order by d.data_id asc ");
@@ -93,6 +93,7 @@
<column name="source_node_id" type="VARCHAR" size="50" description="If the data was inserted by a SymmetricDS data loader, then the id of the source node is record so that data is not re-routed back to it." />
<column name="external_data" type="VARCHAR" size="50" description="A field that can be populated by a trigger that uses the EXTERNAL_SELECT" />
<column name="node_list" type="VARCHAR" size="255" description="A field that can be populated with a comma separated subset of node ids which will be the only nodes available to the router" />
<column name="is_prerouted" type="BOOLEANINT" size="1" required="true" default="0" description="Set to true when routing should ignore this row because data_event and outgoing_batch rows are manually entered." />
<column name="create_time" type="TIMESTAMP" description="Timestamp when this entry was created." />
<index name="idx_d_channel_id">
<index-column name="data_id"/>

0 comments on commit 2a7d2bd

Please sign in to comment.
You can’t perform that action at this time.