Skip to content

Commit

Permalink
0003853: Defer indexes and foreign key constraints for table creation
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Feb 25, 2019
1 parent b57c522 commit 9a9cc30
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 18 deletions.
Expand Up @@ -140,6 +140,7 @@
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.TableReloadRequest;
import org.jumpmind.symmetric.model.TableReloadStatus;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerRouter;
Expand Down Expand Up @@ -2217,34 +2218,24 @@ public void releaseMissedExtractRequests() {
protected void checkSendDeferredConstraints(ExtractRequest request, Node targetNode, OutgoingBatch batch) {
if (parameterService.is(ParameterConstants.INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS, false)) {
TableReloadRequest reloadRequest = dataService.getTableReloadRequest(request.getLoadId(), request.getTriggerId(), request.getRouterId());
if (reloadRequest != null && reloadRequest.isCreateTable()) {
if ((reloadRequest != null && reloadRequest.isCreateTable()) ||
(reloadRequest == null && parameterService.is(ParameterConstants.INITIAL_LOAD_CREATE_SCHEMA_BEFORE_RELOAD))) {
boolean success = false;
Trigger trigger = triggerRouterService.getTriggerById(request.getTriggerId());
if (trigger != null) {
List<TriggerHistory> histories = triggerRouterService.getActiveTriggerHistories(triggerRouterService.getTriggerById(request.getTriggerId()));
if (histories != null && histories.size() > 0) {
dataService.insertCreateEvent(targetNode, histories.get(0), reloadRequest.getRouterId(), Constants.SYSTEM_USER);
Data data = new Data(histories.get(0).getSourceTableName(), DataEventType.CREATE, null, String.valueOf(request.getLoadId()),
histories.get(0), trigger.getChannelId(), null, null);
data.setNodeList(targetNode.getNodeId());
dataService.insertData(data);
success = true;
}
}
if (!success) {
log.warn("Unable to send deferred constraints for trigger '{}' router '{}' in load {}",
reloadRequest.getTriggerId(), reloadRequest.getRouterId(), reloadRequest.getLoadId());
}
} else if (reloadRequest == null && parameterService.is(ParameterConstants.INITIAL_LOAD_CREATE_SCHEMA_BEFORE_RELOAD)) {
boolean success = false;
List<TriggerHistory> histories = triggerRouterService.getActiveTriggerHistories(batch.getSummary().toLowerCase());
if (histories == null || histories.size() == 0) {
histories = triggerRouterService.getActiveTriggerHistories(batch.getSummary().toUpperCase());
}
if (histories != null && histories.size() > 0) {
dataService.insertCreateEvent(targetNode, histories.get(0), request.getRouterId(), Constants.SYSTEM_USER);
success = true;
}
if (!success) {
log.warn("Unable to send deferred constraints for table '{}' router '{}' in load {}",
batch.getSummary(), request.getRouterId(), request.getLoadId());
}
}
}
}
Expand Down Expand Up @@ -2573,6 +2564,16 @@ public CsvData next() {
boolean excludeIndexes = parameterService.is(ParameterConstants.CREATE_TABLE_WITHOUT_INDEXES, false);
boolean deferConstraints = outgoingBatch.isLoadFlag() && parameterService.is(ParameterConstants.INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS, false);

String[] pkData = data.getParsedData(CsvData.PK_DATA);
if (pkData != null && pkData.length > 0) {
outgoingBatch.setLoadId(Long.parseLong(pkData[0]));
TableReloadStatus tableReloadStatus = dataService.getTableReloadStatusByLoadId(outgoingBatch.getLoadId());
if (tableReloadStatus != null && tableReloadStatus.isCompleted()) {
// Ignore create table (indexes and foreign keys) at end of load if it was cancelled
return null;
}
}

/*
* Force a reread of table so new columns are picked up. A create
* event is usually sent after there is a change to the table so
Expand Down
Expand Up @@ -843,9 +843,13 @@ public Map<String, ExtractRequest> insertReloadEvents(Node targetNode, boolean r
triggerHistories, triggerRoutersByHistoryId,
mapReloadRequests, isFullLoad, symNodeSecurityReloadChannel);

setupBatchCount += insertCreateBatchesForReload(targetNode, loadId, createBy,
int createTableBatchCount = insertCreateBatchesForReload(targetNode, loadId, createBy,
triggerHistories, triggerRoutersByHistoryId, transactional,
transaction, mapReloadRequests);
setupBatchCount += createTableBatchCount;
if (parameterService.is(ParameterConstants.INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS, false)) {
finalizeBatchCount += createTableBatchCount;
}

setupBatchCount += insertDeleteBatchesForReload(targetNode, loadId, createBy,
triggerHistories, triggerRoutersByHistoryId, transactional,
Expand Down
Expand Up @@ -117,7 +117,7 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ " set completed = case when ("
+ " data_batch_count = (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end) and "
+ " setup_batch_count = (case when ? < start_data_batch_id then setup_batch_loaded + ? else setup_batch_loaded end) and "
+ " finalize_batch_loaded = (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end)) "
+ " finalize_batch_count = (case when ? > end_data_batch_id then finalize_batch_loaded + ? else finalize_batch_loaded end)) "
+ " then 1 else 0 end, "
+ " end_time = case when ("
+ " data_batch_count = (case when ? between start_data_batch_id and end_data_batch_id then data_batch_loaded + ? else data_batch_loaded end) and "
Expand Down
Expand Up @@ -1134,6 +1134,8 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con
if (context.getChannel().isReloadFlag()) {
context.setNeedsCommitted(true);
}
} else if (dataMetaData.getData().getDataEventType() == DataEventType.CREATE) {
context.setNeedsCommitted(true);
} else {
context.setLastLoadId(-1);
}
Expand Down

0 comments on commit 9a9cc30

Please sign in to comment.