Skip to content

Commit

Permalink
0003853: Defer indexes and foreign key constraints for table creation
Browse files Browse the repository at this point in the history
until after initial load
  • Loading branch information
erilong committed Jan 10, 2019
1 parent 45ef15b commit f38f4cc
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 7 deletions.
Expand Up @@ -164,6 +164,7 @@ private ParameterConstants() {
public final static String INITIAL_LOAD_TRANSPORT_MAX_BYTES_TO_SYNC = "initial.load.transport.max.bytes.to.sync";
public final static String INITIAL_LOAD_USE_ESTIMATED_COUNTS = "initial.load.use.estimated.counts";
public final static String INITIAL_LOAD_PURGE_STAGE_IMMEDIATE_THRESHOLD_ROWS = "initial.load.purge.stage.immediate.threshold.rows";
public final static String INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS = "initial.load.defer.create.constraints";

public final static String CREATE_TABLE_WITHOUT_DEFAULTS = "create.table.without.defaults";
public final static String CREATE_TABLE_WITHOUT_FOREIGN_KEYS = "create.table.without.foreign.keys";
Expand Down
Expand Up @@ -51,7 +51,9 @@ public interface IDataService {

public TableReloadRequest getTableReloadRequest(TableReloadRequestKey key);

public TableReloadRequest getTableReloadRequest(int loadId);
public TableReloadRequest getTableReloadRequest(long loadId);

public TableReloadRequest getTableReloadRequest(long loadId, String triggerId, String routerId);

public List<TableReloadRequest> getTableReloadRequestToProcess(final String sourceNodeId);

Expand Down Expand Up @@ -135,6 +137,8 @@ public void insertScriptEvent(String channelId, Node targetNode, String script,
public void insertScriptEvent(ISqlTransaction transaction, String channelId,
Node targetNode, String script, boolean isLoad, long loadId, String createBy);

public void insertCreateEvent(Node targetNode, TriggerHistory triggerHistory, String routerId, String createBy);

public void insertCreateEvent(Node targetNode, TriggerHistory triggerHistory, String routerId, boolean isLoad, long loadId, String createBy);

public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, TriggerHistory triggerHistory, String channelId, String routerId, boolean isLoad, long loadId, String createBy);
Expand Down
Expand Up @@ -138,6 +138,7 @@
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.TableReloadRequest;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerRouter;
Expand Down Expand Up @@ -2027,6 +2028,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
}
}

checkSendDeferredConstraints(request, targetNode, firstBatch);
} else {
log.info("Batches already had an OK status for request {}, batches {} to {}. Not extracting", new Object[] { request.getRequestId(), request.getStartBatchId(),
request.getEndBatchId() });
Expand Down Expand Up @@ -2123,6 +2125,41 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
}
}

protected void checkSendDeferredConstraints(ExtractRequest request, Node targetNode, OutgoingBatch batch) {
if (parameterService.is(ParameterConstants.INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS, false)) {
TableReloadRequest reloadRequest = dataService.getTableReloadRequest(request.getLoadId(), request.getTriggerId(), request.getRouterId());
if (reloadRequest != null && reloadRequest.isCreateTable()) {
boolean success = false;
Trigger trigger = triggerRouterService.getTriggerById(request.getTriggerId());
if (trigger != null) {
List<TriggerHistory> histories = triggerRouterService.getActiveTriggerHistories(triggerRouterService.getTriggerById(request.getTriggerId()));
if (histories != null && histories.size() > 0) {
dataService.insertCreateEvent(targetNode, histories.get(0), reloadRequest.getRouterId(), Constants.SYSTEM_USER);
success = true;
}
}
if (!success) {
log.warn("Unable to send deferred constraints for trigger '{}' router '{}' in load {}",
reloadRequest.getTriggerId(), reloadRequest.getRouterId(), reloadRequest.getLoadId());
}
} else if (reloadRequest == null && parameterService.is(ParameterConstants.INITIAL_LOAD_CREATE_SCHEMA_BEFORE_RELOAD)) {
boolean success = false;
List<TriggerHistory> histories = triggerRouterService.getActiveTriggerHistories(batch.getSummary().toLowerCase());
if (histories == null || histories.size() == 0) {
histories = triggerRouterService.getActiveTriggerHistories(batch.getSummary().toUpperCase());
}
if (histories != null && histories.size() > 0) {
dataService.insertCreateEvent(targetNode, histories.get(0), request.getRouterId(), Constants.SYSTEM_USER);
success = true;
}
if (!success) {
log.warn("Unable to send deferred constraints for table '{}' router '{}' in load {}",
batch.getSummary(), request.getRouterId(), request.getLoadId());
}
}
}
}

protected boolean isApplicable(NodeCommunication nodeCommunication) {
return nodeCommunication.getCommunicationType() != CommunicationType.FILE_XTRCT;
}
Expand Down Expand Up @@ -2442,6 +2479,7 @@ public CsvData next() {
boolean excludeDefaults = parameterService.is(ParameterConstants.CREATE_TABLE_WITHOUT_DEFAULTS, false);
boolean excludeForeignKeys = parameterService.is(ParameterConstants.CREATE_TABLE_WITHOUT_FOREIGN_KEYS, false);
boolean excludeIndexes = parameterService.is(ParameterConstants.CREATE_TABLE_WITHOUT_INDEXES, false);
boolean deferConstraints = outgoingBatch.isLoadFlag() && parameterService.is(ParameterConstants.INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS, false);

/*
* Force a reread of table so new columns are picked up. A create
Expand Down Expand Up @@ -2473,10 +2511,10 @@ public CsvData next() {
}
}
}
if (excludeForeignKeys) {
if (excludeForeignKeys || deferConstraints) {
copyTargetTable.removeAllForeignKeys();
}
if (excludeIndexes) {
if (excludeIndexes || deferConstraints) {
copyTargetTable.removeAllIndexes();
}

Expand Down
Expand Up @@ -270,14 +270,25 @@ public TableReloadRequest mapRow(Row rs) {


@Override
public TableReloadRequest getTableReloadRequest(int loadId) {
public TableReloadRequest getTableReloadRequest(long loadId) {
List<TableReloadRequest> requests = sqlTemplate.query(getSql("selectTableReloadRequestsByLoadId"),
new TableReloadRequestMapper(), loadId);

List<TableReloadRequest> collapsedRequests = collapseTableReloadRequestsByLoadId(requests);
return collapsedRequests == null || collapsedRequests.size() == 0 ? null : collapsedRequests.get(0);
}


@Override
public TableReloadRequest getTableReloadRequest(long loadId, String triggerId, String routerId) {
List<TableReloadRequest> requests = sqlTemplate.query(getSql("selectTableReloadRequestsByLoadIdTriggerRouter"),
new TableReloadRequestMapper(), loadId, triggerId, routerId);
if (requests == null || requests.size() == 0) {
requests = sqlTemplate.query(getSql("selectTableReloadRequestsByLoadIdTriggerRouter"),
new TableReloadRequestMapper(), loadId, ParameterConstants.ALL, ParameterConstants.ALL);
}
return requests == null || requests.size() == 0 ? null : requests.get(0);
}

public List<TableReloadRequest> getTableReloadRequestToProcess(final String sourceNodeId) {
return sqlTemplate.query(getSql("selectTableReloadRequestToProcess"),
new ISqlRowMapper<TableReloadRequest>() {
Expand Down Expand Up @@ -1545,6 +1556,7 @@ public int countDataInRange(long firstDataId, long secondDataId) {
return sqlTemplate.queryForInt(getSql("countDataInRangeSql"), firstDataId, secondDataId);
}

@Override
public void insertCreateEvent(final Node targetNode, TriggerHistory triggerHistory, String routerId,
boolean isLoad, long loadId, String createBy) {
ISqlTransaction transaction = null;
Expand All @@ -1568,7 +1580,30 @@ public void insertCreateEvent(final Node targetNode, TriggerHistory triggerHisto
}
}

public void insertCreateEvent(ISqlTransaction transaction, Node targetNode,
@Override
public void insertCreateEvent(Node targetNode, TriggerHistory triggerHistory, String routerId, String createBy) {
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
Trigger trigger = engine.getTriggerRouterService().getTriggerById(triggerHistory.getTriggerId(), false);
insertCreateEvent(transaction, targetNode, triggerHistory, trigger.getChannelId(), routerId, false, -1, createBy);
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
close(transaction);
}
}

protected void insertCreateEvent(ISqlTransaction transaction, Node targetNode,
TriggerHistory triggerHistory, String routerId, boolean isLoad, long loadId, String createBy) {
Trigger trigger = engine.getTriggerRouterService().getTriggerById(
triggerHistory.getTriggerId(), false);
Expand All @@ -1578,6 +1613,7 @@ public void insertCreateEvent(ISqlTransaction transaction, Node targetNode,
: Constants.CHANNEL_CONFIG, routerId, isLoad, loadId, createBy);
}

@Override
public void insertCreateEvent(ISqlTransaction transaction, Node targetNode,
TriggerHistory triggerHistory, String channelId, String routerId, boolean isLoad, long loadId, String createBy) {

Expand Down
Expand Up @@ -62,7 +62,17 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ " from $(table_reload_request) "
+ " where load_id = ? "
+ " order by processed, completed, last_update_time desc");


putSql("selectTableReloadRequestsByLoadIdTriggerRouter", "select source_node_id, target_node_id, load_id, "
+ " batch_count, batch_loaded_count, table_count, row_count, row_loaded_count, "
+ " create_table, delete_first, reload_select, channel_id, "
+ " before_custom_sql, processed, completed, cancelled, "
+ " reload_time, channel_id, create_time, last_update_by, "
+ " last_update_time, trigger_id, router_id, error_flag, sql_state, sql_code, sql_message"
+ " from $(table_reload_request) "
+ " where load_id = ? and trigger_id = ? and router_id = ?"
+ " order by processed, completed, last_update_time desc");

putSql("countTableReloadRequestRowsByLoadId", "select sum(row_count) from $(table_reload_request) where load_id = ?");

putSql("updateProcessedTableReloadRequest", "update $(table_reload_request) set last_update_time = ?, batch_count = ?, processed = 1 where load_id = ?");
Expand Down
Expand Up @@ -732,6 +732,14 @@ initial.load.use.estimated.counts=true
# Tags: load
initial.load.purge.stage.immediate.threshold.rows=5000

# If tables are created as part of the initial load, it will defer the creation
# of foreign keys and indexes to improve performance.
# After data is loaded, the constraints will be added after tables have reached consistency.
#
# DatabaseOverridable: true
# Tags: load
initial.load.defer.create.constraints=true

# If this is true, registration is opened automatically for nodes requesting it.
#
# DatabaseOverridable: true
Expand Down

0 comments on commit f38f4cc

Please sign in to comment.