Skip to content

Commit

Permalink
0001214: reloadTable race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed May 14, 2013
1 parent 11b69ec commit e25b890
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 32 deletions.
Expand Up @@ -26,13 +26,15 @@ public class TableReloadRequestKey {
protected String sourceNodeId;
protected String triggerId;
protected String routerId;
protected String receivedFromNodeId;

public TableReloadRequestKey(String targetNodeId, String sourceNodeId, String triggerId,
String routerId) {
String routerId, String receivedFromNodeId) {
this.targetNodeId = targetNodeId;
this.sourceNodeId = sourceNodeId;
this.triggerId = triggerId;
this.routerId = routerId;
this.receivedFromNodeId = receivedFromNodeId;
}

public String getRouterId() {
Expand All @@ -50,5 +52,13 @@ public String getTargetNodeId() {
public String getTriggerId() {
return triggerId;
}

public void setReceivedFromNodeId(String receivedFromNodeId) {
this.receivedFromNodeId = receivedFromNodeId;
}

public String getReceivedFromNodeId() {
return receivedFromNodeId;
}

}
Expand Up @@ -184,7 +184,7 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
String triggerId = columnValues.get("TRIGGER_ID");

list.add(new TableReloadRequestKey(targetNodeId, sourceNodeId, triggerId,
routerId));
routerId, dataMetaData.getData().getSourceNodeId()));
}
} else {
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
Expand Down Expand Up @@ -396,10 +396,8 @@ public void contextCommitted(SimpleRouterContext routingContext) {
jobManager.stopJobs();
jobManager.startJobs();
}

}
}

}

protected void insertReloadEvents(SimpleRouterContext routingContext) {
Expand All @@ -410,7 +408,7 @@ protected void insertReloadEvents(SimpleRouterContext routingContext) {
for (TableReloadRequestKey reloadRequestKey : reloadRequestKeys) {
TableReloadRequest request = engine.getDataService()
.getTableReloadRequest(reloadRequestKey);
if (engine.getDataService().insertReloadEvent(request)) {
if (engine.getDataService().insertReloadEvent(request, reloadRequestKey.getReceivedFromNodeId() != null)) {
log.info(
"Inserted table reload request from config data router for node {} and trigger {}",
reloadRequestKey.getTargetNodeId(), reloadRequestKey.getTriggerId());
Expand Down
Expand Up @@ -67,9 +67,7 @@ public interface IDataService {

public void insertReloadEvents(Node targetNode, boolean reverse);

public boolean insertReloadEvent(TableReloadRequest request, boolean updateTableReloadRequest);

public boolean insertReloadEvent(TableReloadRequest request);
public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient);

public void insertReloadEvent(ISqlTransaction transaction, Node targetNode,
TriggerRouter triggerRouter, TriggerHistory triggerHistory, String overrideInitialLoadSelect, long loadId, String createBy);
Expand Down
Expand Up @@ -101,11 +101,7 @@ public DataService(ISymmetricEngine engine) {

protected Map<IHeartbeatListener, Long> lastHeartbeatTimestamps = new HashMap<IHeartbeatListener, Long>();

public boolean insertReloadEvent(TableReloadRequest request) {
return insertReloadEvent(request, true);
}

public boolean insertReloadEvent(TableReloadRequest request, boolean updateTableReloadRequest) {
public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient) {
boolean successful = false;
if (request.isReloadEnabled()) {
ITriggerRouterService triggerRouterService = engine.getTriggerRouterService();
Expand Down Expand Up @@ -140,14 +136,14 @@ public boolean insertReloadEvent(TableReloadRequest request, boolean updateTable
insertReloadEvent(transaction, targetNode, triggerRouter,
triggerHistory, request.getReloadSelect(), false, -1, null);

if (updateTableReloadRequest) {
if (!targetNode.requires13Compatiblity() && deleteAtClient) {
insertSqlEvent(
transaction,
triggerHistory,
trigger.getChannelId(),
targetNode,
String.format(
"update %s set reload_enabled=0, reload_time=current_timestamp where target_node_id='%s' and source_node_id='%s' and trigger_id='%s' and router_id='%s'",
"delete from %s where target_node_id='%s' and source_node_id='%s' and trigger_id='%s' and router_id='%s'",
TableConstants
.getTableName(
tablePrefix,
Expand All @@ -157,15 +153,11 @@ public boolean insertReloadEvent(TableReloadRequest request, boolean updateTable
.getTriggerId(), request.getRouterId()),
false, -1, null);
}

deleteTableReloadRequest(transaction, request);

transaction.commit();

request.setReloadEnabled(false);
request.setReloadTime(new Date());
request.setLastUpdateBy("symmetricds");
if (updateTableReloadRequest) {
saveTableReloadRequest(request);
}
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
Expand Down Expand Up @@ -207,6 +199,15 @@ public boolean insertReloadEvent(TableReloadRequest request, boolean updateTable

}

protected void deleteTableReloadRequest(ISqlTransaction sqlTransaction, TableReloadRequest request) {
sqlTransaction.prepareAndExecute(
getSql("deleteTableReloadRequest"),
new Object[] { request.getSourceNodeId(),
request.getTargetNodeId(), request.getTriggerId(), request.getRouterId() },
new int[] { Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR });
}

public void saveTableReloadRequest(TableReloadRequest request) {
Date time = new Date();
request.setLastUpdateTime(time);
Expand Down Expand Up @@ -959,17 +960,18 @@ public String reloadTable(String nodeId, String catalogName, String schemaName,
.getTriggerHistoryId());
if (triggerRouters != null && triggerRouters.size() > 0) {
for (TriggerRouter triggerRouter : triggerRouters) {
if (parameterService
.is(ParameterConstants.INITIAL_LOAD_CREATE_SCHEMA_BEFORE_RELOAD)) {
String xml = symmetricDialect.getCreateTableXML(triggerHistory,
triggerRouter);
insertCreateEvent(targetNode, triggerRouter, triggerHistory, xml, true, -1, null);
} else if (parameterService
.is(ParameterConstants.INITIAL_LOAD_DELETE_BEFORE_RELOAD)) {
insertPurgeEvent(targetNode, triggerRouter, triggerHistory, true, -1, null);
}
eventCount++;
insertReloadEvent(targetNode, triggerRouter, overrideInitialLoadSelect);
if (!triggerRouter.getTrigger().isSourceTableNameWildCarded()) {
TableReloadRequest request = new TableReloadRequest();
request.setTriggerId(triggerRouter.getTrigger().getTriggerId());
request.setRouterId(triggerRouter.getRouter().getRouterId());
request.setSourceNodeId(sourceNode.getNodeId());
request.setTargetNodeId(targetNode.getNodeId());
request.setReloadTime(null);
request.setReloadEnabled(true);
request.setReloadSelect(overrideInitialLoadSelect);
saveTableReloadRequest(request);
eventCount++;
}
}
}
}
Expand Down
Expand Up @@ -14,6 +14,8 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
putSql("insertTableReloadRequest", "insert into $(table_reload_request) (reload_select, reload_delete_stmt, reload_enabled, reload_time, create_time, last_update_by, last_update_time, source_node_id, target_node_id, trigger_id, router_id) values (?,?,?,?,?,?,?,?,?,?,?)");

putSql("updateTableReloadRequest", "update $(table_reload_request) set reload_select=?, reload_delete_stmt=?, reload_enabled=?, reload_time=?, create_time=?, last_update_by=?, last_update_time=? where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?");

putSql("deleteTableReloadRequest", "delete from $(table_reload_request) where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?");

// Note that the order by data_id is done appended in code
putSql("selectEventDataToExtractSql",
Expand Down

0 comments on commit e25b890

Please sign in to comment.