diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java index 64da4e245b..30635a1d0e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java @@ -77,8 +77,13 @@ public ExtractRequest requestExtractRequest(ISqlTransaction transaction, String public List getPendingTablesForExtractByLoadIdAndNodeId(long loadId, String nodeId); + public List getPendingTablesForExtractByLoadIdNodeIdSourceId(long loadId, String nodeId, String sourceId); + public List getCompletedTablesForExtractByLoadIdAndNodeId(long loadId, String nodeId); + public List getCompletedTablesForExtractByLoadIdNodeIdSourceId(long loadId, String nodeId, + String sourceId); + public void updateExtractRequestLoadTime(ISqlTransaction transaction, Date loadTime, OutgoingBatch batch); public void updateExtractRequestTransferred(OutgoingBatch batch, long transferMillis); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index d5d515b353..878516cfde 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -58,6 +58,8 @@ public interface IDataService { public TableReloadRequest getTableReloadRequest(long loadId); + public TableReloadRequest getTableReloadRequestByLoadIdAndSourceNodeId(long loadId, String sourceNodeId); + public List getTableReloadRequests(long loadId); public TableReloadRequest getTableReloadRequest(long loadId, String triggerId, String routerId); @@ -81,6 +83,8 @@ public interface IDataService { public List getActiveIncomingTableReloadStatus(); public TableReloadStatus getTableReloadStatusByLoadId(long loadId); + + public TableReloadStatus getTableReloadStatusByLoadIdAndSourceId(long loadId, String sourceId); public List getTableReloadStatusByTarget(String targetNodeId); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index d8881aa0ad..3162a7ea7b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -1413,11 +1413,22 @@ public List getPendingTablesForExtractByLoadIdAndNodeId(long loa return sqlTemplate.query(getSql("selectIncompleteTablesForExtractByLoadIdAndNodeId"), new ExtractRequestMapper(), loadId, nodeId); } + @Override + public List getPendingTablesForExtractByLoadIdNodeIdSourceId(long loadId, String nodeId, String sourceId) { + return sqlTemplate.query(getSql("selectIncompleteTablesForExtractByLoadIdNodeIdSourceId"), new ExtractRequestMapper(), loadId, nodeId, sourceId); + } + @Override public List getCompletedTablesForExtractByLoadIdAndNodeId(long loadId, String nodeId) { return sqlTemplate.query(getSql("selectCompletedTablesForExtractByLoadIdAndNodeId"), new ExtractRequestMapper(), loadId, nodeId); } + @Override + public List getCompletedTablesForExtractByLoadIdNodeIdSourceId(long loadId, String nodeId, + String sourceId) { + return sqlTemplate.query(getSql("selectCompletedTablesForExtractByLoadIdNodeIdSourceId"), new ExtractRequestMapper(), loadId, nodeId, sourceId); + } + @Override public void updateExtractRequestLoadTime(ISqlTransaction transaction, Date loadTime, OutgoingBatch outgoingBatch) { if (platform.supportsParametersInSelect()) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java index f2a2faa5e3..9b5f7eb9bc 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorServiceSqlMap.java @@ -78,9 +78,13 @@ public DataExtractorServiceSqlMap(IDatabasePlatform platform, putSql("selectCompletedTablesForExtractByLoadId", "select * from $(extract_request) where load_id = ? and loaded_time is not null and source_node_id = ? order by request_id"); putSql("selectIncompleteTablesForExtractByLoadIdAndNodeId", "select * from $(extract_request) where load_id = ? and loaded_time is null and node_id = ? order by request_id"); - + + putSql("selectIncompleteTablesForExtractByLoadIdNodeIdSourceId", "select * from $(extract_request) where load_id = ? and loaded_time is null and node_id = ? and source_node_id = ? order by request_id"); + putSql("selectCompletedTablesForExtractByLoadIdAndNodeId", "select * from $(extract_request) where load_id = ? and loaded_time is not null and node_id = ? order by request_id"); - + + putSql("selectCompletedTablesForExtractByLoadIdNodeIdSourceId", "select * from $(extract_request) where load_id = ? and loaded_time is not null and node_id = ? and source_node_id = ? order by request_id"); + putSql("updateExtractRequestStatuses", "update $(extract_request) set status=?, last_update_time=? " + "where load_id=? and source_node_id=? and status=?"); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 3ff76a8e41..80b754c135 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -301,6 +301,13 @@ public List getTableReloadRequests(long loadId) { return collapsedRequests; } + @Override + public TableReloadRequest getTableReloadRequestByLoadIdAndSourceNodeId(long loadId, String sourceNodeId) { + List requests = sqlTemplate.query(getSql("selectTableReloadRequestsByLoadIdAndSourceNodeId"), + new TableReloadRequestMapper(), loadId, sourceNodeId); + return requests == null || requests.size() == 0 ? null : requests.get(0); + } + @Override public TableReloadRequest getTableReloadRequest(long loadId, String triggerId, String routerId) { List requests = sqlTemplate.query(getSql("selectTableReloadRequestsByLoadIdTriggerRouter"), @@ -400,6 +407,11 @@ public TableReloadStatus getTableReloadStatusByLoadId(long loadId) { new TableReloadStatusMapper(), loadId); } + public TableReloadStatus getTableReloadStatusByLoadIdAndSourceId(long loadId, String sourceId) { + return sqlTemplateDirty.queryForObject(getSql("selectTableReloadStatusByLoadIdAndSourceId"), + new TableReloadStatusMapper(), loadId, sourceId); + } + public List getTableReloadStatusByTarget(String targetNodeId) { return sqlTemplateDirty.query(getSql("selectTableReloadStatusByTargetNodeId"), new TableReloadStatusMapper(), targetNodeId); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java index 14c4bc8eec..1e9f8341f1 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java @@ -59,6 +59,14 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace + " from $(table_reload_request) " + " where load_id = ? " + " order by processed, last_update_time desc"); + putSql("selectTableReloadRequestsByLoadIdAndSourceNodeId", "select source_node_id, target_node_id, load_id, " + + " create_table, delete_first, reload_select, channel_id, " + + " before_custom_sql, processed, " + + " reload_time, channel_id, create_time, last_update_by, " + + " last_update_time, trigger_id, router_id, reload_select" + + " from $(table_reload_request) " + + " where load_id = ? and source_node_id = ?" + + " order by processed, last_update_time desc"); putSql("selectTableReloadRequestsByLoadIdTriggerRouter", "select source_node_id, target_node_id, load_id, " + " create_table, delete_first, reload_select, channel_id, " + " before_custom_sql, processed, " @@ -101,6 +109,16 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace + " error_flag, sql_state, sql_code, sql_message, batch_bulk_load_count " + " from $(table_reload_status) " + " where load_id = ?"); + putSql("selectTableReloadStatusByLoadIdAndSourceId", "select source_node_id, target_node_id, load_id, " + + " end_data_batch_id, start_data_batch_id, " + + " setup_batch_count, data_batch_count, finalize_batch_count, " + + " setup_batch_loaded, data_batch_loaded, finalize_batch_loaded, " + + " table_count, rows_loaded, rows_count, " + + " completed, cancelled, full_load, " + + " start_time, end_time, last_update_time, last_update_by, " + + " error_flag, sql_state, sql_code, sql_message, batch_bulk_load_count " + + " from $(table_reload_status) " + + " where load_id = ? and source_node_id = ?"); putSql("selectTableReloadStatusByTargetNodeId", "select source_node_id, target_node_id, load_id, " + " end_data_batch_id, start_data_batch_id, " + " setup_batch_count, data_batch_count, finalize_batch_count, "