diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index 22628b9ef4..b1e7fdaf01 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -164,6 +164,7 @@ private ParameterConstants() { public final static String INITIAL_LOAD_TRANSPORT_MAX_BYTES_TO_SYNC = "initial.load.transport.max.bytes.to.sync"; public final static String INITIAL_LOAD_USE_ESTIMATED_COUNTS = "initial.load.use.estimated.counts"; public final static String INITIAL_LOAD_PURGE_STAGE_IMMEDIATE_THRESHOLD_ROWS = "initial.load.purge.stage.immediate.threshold.rows"; + public final static String INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS = "initial.load.defer.create.constraints"; public final static String CREATE_TABLE_WITHOUT_DEFAULTS = "create.table.without.defaults"; public final static String CREATE_TABLE_WITHOUT_FOREIGN_KEYS = "create.table.without.foreign.keys"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index bd224f9a53..87bf7ba101 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -51,7 +51,9 @@ public interface IDataService { public TableReloadRequest getTableReloadRequest(TableReloadRequestKey key); - public TableReloadRequest getTableReloadRequest(int loadId); + public TableReloadRequest getTableReloadRequest(long loadId); + + public TableReloadRequest getTableReloadRequest(long loadId, String triggerId, String routerId); public List getTableReloadRequestToProcess(final String sourceNodeId); @@ -135,6 +137,8 @@ public void insertScriptEvent(String channelId, Node targetNode, String script, public void insertScriptEvent(ISqlTransaction transaction, String channelId, Node targetNode, String script, boolean isLoad, long loadId, String createBy); + public void insertCreateEvent(Node targetNode, TriggerHistory triggerHistory, String routerId, String createBy); + public void insertCreateEvent(Node targetNode, TriggerHistory triggerHistory, String routerId, boolean isLoad, long loadId, String createBy); public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, TriggerHistory triggerHistory, String channelId, String routerId, boolean isLoad, long loadId, String createBy); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 7328c4cdb8..a69ba72cc6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -138,6 +138,7 @@ import org.jumpmind.symmetric.model.RemoteNodeStatus; import org.jumpmind.symmetric.model.RemoteNodeStatuses; import org.jumpmind.symmetric.model.Router; +import org.jumpmind.symmetric.model.TableReloadRequest; import org.jumpmind.symmetric.model.Trigger; import org.jumpmind.symmetric.model.TriggerHistory; import org.jumpmind.symmetric.model.TriggerRouter; @@ -2027,6 +2028,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status } } + checkSendDeferredConstraints(request, targetNode, firstBatch); } else { log.info("Batches already had an OK status for request {}, batches {} to {}. Not extracting", new Object[] { request.getRequestId(), request.getStartBatchId(), request.getEndBatchId() }); @@ -2123,6 +2125,41 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status } } + protected void checkSendDeferredConstraints(ExtractRequest request, Node targetNode, OutgoingBatch batch) { + if (parameterService.is(ParameterConstants.INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS, false)) { + TableReloadRequest reloadRequest = dataService.getTableReloadRequest(request.getLoadId(), request.getTriggerId(), request.getRouterId()); + if (reloadRequest != null && reloadRequest.isCreateTable()) { + boolean success = false; + Trigger trigger = triggerRouterService.getTriggerById(request.getTriggerId()); + if (trigger != null) { + List histories = triggerRouterService.getActiveTriggerHistories(triggerRouterService.getTriggerById(request.getTriggerId())); + if (histories != null && histories.size() > 0) { + dataService.insertCreateEvent(targetNode, histories.get(0), reloadRequest.getRouterId(), Constants.SYSTEM_USER); + success = true; + } + } + if (!success) { + log.warn("Unable to send deferred constraints for trigger '{}' router '{}' in load {}", + reloadRequest.getTriggerId(), reloadRequest.getRouterId(), reloadRequest.getLoadId()); + } + } else if (reloadRequest == null && parameterService.is(ParameterConstants.INITIAL_LOAD_CREATE_SCHEMA_BEFORE_RELOAD)) { + boolean success = false; + List histories = triggerRouterService.getActiveTriggerHistories(batch.getSummary().toLowerCase()); + if (histories == null || histories.size() == 0) { + histories = triggerRouterService.getActiveTriggerHistories(batch.getSummary().toUpperCase()); + } + if (histories != null && histories.size() > 0) { + dataService.insertCreateEvent(targetNode, histories.get(0), request.getRouterId(), Constants.SYSTEM_USER); + success = true; + } + if (!success) { + log.warn("Unable to send deferred constraints for table '{}' router '{}' in load {}", + batch.getSummary(), request.getRouterId(), request.getLoadId()); + } + } + } + } + protected boolean isApplicable(NodeCommunication nodeCommunication) { return nodeCommunication.getCommunicationType() != CommunicationType.FILE_XTRCT; } @@ -2442,6 +2479,7 @@ public CsvData next() { boolean excludeDefaults = parameterService.is(ParameterConstants.CREATE_TABLE_WITHOUT_DEFAULTS, false); boolean excludeForeignKeys = parameterService.is(ParameterConstants.CREATE_TABLE_WITHOUT_FOREIGN_KEYS, false); boolean excludeIndexes = parameterService.is(ParameterConstants.CREATE_TABLE_WITHOUT_INDEXES, false); + boolean deferConstraints = outgoingBatch.isLoadFlag() && parameterService.is(ParameterConstants.INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS, false); /* * Force a reread of table so new columns are picked up. A create @@ -2473,10 +2511,10 @@ public CsvData next() { } } } - if (excludeForeignKeys) { + if (excludeForeignKeys || deferConstraints) { copyTargetTable.removeAllForeignKeys(); } - if (excludeIndexes) { + if (excludeIndexes || deferConstraints) { copyTargetTable.removeAllIndexes(); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 3ce65168b1..fad34f5657 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -270,14 +270,25 @@ public TableReloadRequest mapRow(Row rs) { @Override - public TableReloadRequest getTableReloadRequest(int loadId) { + public TableReloadRequest getTableReloadRequest(long loadId) { List requests = sqlTemplate.query(getSql("selectTableReloadRequestsByLoadId"), new TableReloadRequestMapper(), loadId); List collapsedRequests = collapseTableReloadRequestsByLoadId(requests); return collapsedRequests == null || collapsedRequests.size() == 0 ? null : collapsedRequests.get(0); } - + + @Override + public TableReloadRequest getTableReloadRequest(long loadId, String triggerId, String routerId) { + List requests = sqlTemplate.query(getSql("selectTableReloadRequestsByLoadIdTriggerRouter"), + new TableReloadRequestMapper(), loadId, triggerId, routerId); + if (requests == null || requests.size() == 0) { + requests = sqlTemplate.query(getSql("selectTableReloadRequestsByLoadIdTriggerRouter"), + new TableReloadRequestMapper(), loadId, ParameterConstants.ALL, ParameterConstants.ALL); + } + return requests == null || requests.size() == 0 ? null : requests.get(0); + } + public List getTableReloadRequestToProcess(final String sourceNodeId) { return sqlTemplate.query(getSql("selectTableReloadRequestToProcess"), new ISqlRowMapper() { @@ -1545,6 +1556,7 @@ public int countDataInRange(long firstDataId, long secondDataId) { return sqlTemplate.queryForInt(getSql("countDataInRangeSql"), firstDataId, secondDataId); } + @Override public void insertCreateEvent(final Node targetNode, TriggerHistory triggerHistory, String routerId, boolean isLoad, long loadId, String createBy) { ISqlTransaction transaction = null; @@ -1568,7 +1580,30 @@ public void insertCreateEvent(final Node targetNode, TriggerHistory triggerHisto } } - public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, + @Override + public void insertCreateEvent(Node targetNode, TriggerHistory triggerHistory, String routerId, String createBy) { + ISqlTransaction transaction = null; + try { + transaction = sqlTemplate.startSqlTransaction(); + Trigger trigger = engine.getTriggerRouterService().getTriggerById(triggerHistory.getTriggerId(), false); + insertCreateEvent(transaction, targetNode, triggerHistory, trigger.getChannelId(), routerId, false, -1, createBy); + transaction.commit(); + } catch (Error ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } catch (RuntimeException ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } finally { + close(transaction); + } + } + + protected void insertCreateEvent(ISqlTransaction transaction, Node targetNode, TriggerHistory triggerHistory, String routerId, boolean isLoad, long loadId, String createBy) { Trigger trigger = engine.getTriggerRouterService().getTriggerById( triggerHistory.getTriggerId(), false); @@ -1578,6 +1613,7 @@ public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, : Constants.CHANNEL_CONFIG, routerId, isLoad, loadId, createBy); } + @Override public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, TriggerHistory triggerHistory, String channelId, String routerId, boolean isLoad, long loadId, String createBy) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java index fc163e9c8f..816b8bed58 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java @@ -62,7 +62,17 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace + " from $(table_reload_request) " + " where load_id = ? " + " order by processed, completed, last_update_time desc"); - + + putSql("selectTableReloadRequestsByLoadIdTriggerRouter", "select source_node_id, target_node_id, load_id, " + + " batch_count, batch_loaded_count, table_count, row_count, row_loaded_count, " + + " create_table, delete_first, reload_select, channel_id, " + + " before_custom_sql, processed, completed, cancelled, " + + " reload_time, channel_id, create_time, last_update_by, " + + " last_update_time, trigger_id, router_id, error_flag, sql_state, sql_code, sql_message" + + " from $(table_reload_request) " + + " where load_id = ? and trigger_id = ? and router_id = ?" + + " order by processed, completed, last_update_time desc"); + putSql("countTableReloadRequestRowsByLoadId", "select sum(row_count) from $(table_reload_request) where load_id = ?"); putSql("updateProcessedTableReloadRequest", "update $(table_reload_request) set last_update_time = ?, batch_count = ?, processed = 1 where load_id = ?"); diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index 11fb8cad84..82366905fc 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -732,6 +732,14 @@ initial.load.use.estimated.counts=true # Tags: load initial.load.purge.stage.immediate.threshold.rows=5000 +# If tables are created as part of the initial load, it will defer the creation +# of foreign keys and indexes to improve performance. +# After data is loaded, the constraints will be added after tables have reached consistency. +# +# DatabaseOverridable: true +# Tags: load +initial.load.defer.create.constraints=true + # If this is true, registration is opened automatically for nodes requesting it. # # DatabaseOverridable: true