diff --git a/symmetric-assemble/src/asciidoc/images/manage/manage-incoming-loads.PNG b/symmetric-assemble/src/asciidoc/images/manage/manage-incoming-loads.PNG new file mode 100644 index 0000000000..7107b99e9a Binary files /dev/null and b/symmetric-assemble/src/asciidoc/images/manage/manage-incoming-loads.PNG differ diff --git a/symmetric-assemble/src/asciidoc/manage.ad b/symmetric-assemble/src/asciidoc/manage.ad index 1cd6214c45..73f8c847ff 100644 --- a/symmetric-assemble/src/asciidoc/manage.ad +++ b/symmetric-assemble/src/asciidoc/manage.ad @@ -183,13 +183,22 @@ include::manage/installed-triggers.ad[] ifdef::pro[] === Outgoing Loads -The Outgoing Loads screen shows the number of loads that have been queued. It also lists loads that have had <> created. The screens shows +The Outgoing Loads screen shows the number of outgoing loads that have been queued. It also lists loads that have had <> created. The screen shows loads that are sourced from the current node. You can cancel a load that is in progress by selecting the load and pressing the _Cancel_ button. image::manage/manage-outgoing-loads.png[] +=== Incoming Loads + +The Incoming Loads screen shows the number of incoming loads that have been queued. It also lists loads that have had <> created. The screen shows +loads that are targeted at the current node. + +You can cancel a load that is in progress by selecting the load and pressing the _Cancel_ button. + +image::manage/manage-incoming-loads.png[] + endif::pro[] === Outgoing Batches diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index 80ee4991d8..aeb72b44e6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -61,11 +61,21 @@ public interface IDataService { public TableReloadRequest getTableReloadRequest(long loadId, String triggerId, String routerId); public List getTableReloadRequestToProcess(final String sourceNodeId); + + public List getTableReloadRequestToProcessByTarget(final String targetNodeId); public List getTableReloadStatus(); + public List getOutgoingTableReloadStatus(); + + public List getIncomingTableReloadStatus(); + public List getActiveTableReloadStatus(); + public List getActiveOutgoingTableReloadStatus(); + + public List getActiveIncomingTableReloadStatus(); + public TableReloadStatus getTableReloadStatusByLoadId(long loadId); public List getTableReloadStatusByTarget(String targetNodeId); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 883f67dea4..ae538eb419 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -328,6 +328,30 @@ public TableReloadRequest mapRow(Row rs) { } }, sourceNodeId); } + + public List getTableReloadRequestToProcessByTarget(final String targetNodeId) { + return sqlTemplate.query(getSql("selectTableReloadRequestToProcessByTarget"), + new ISqlRowMapper() { + public TableReloadRequest mapRow(Row rs) { + TableReloadRequest request = new TableReloadRequest(); + request.setSourceNodeId(rs.getString("source_node_id")); + request.setTargetNodeId(targetNodeId); + request.setCreateTable(rs.getBoolean("create_table")); + request.setDeleteFirst(rs.getBoolean("delete_first")); + request.setReloadSelect(rs.getString("reload_select")); + request.setReloadTime(rs.getDateTime("reload_time")); + request.setBeforeCustomSql(rs.getString("before_custom_sql")); + request.setChannelId(rs.getString("channel_id")); + request.setTriggerId(rs.getString("trigger_id")); + request.setRouterId(rs.getString("router_id")); + request.setLoadId(rs.getLong("load_id")); + request.setCreateTime(rs.getDateTime("create_time")); + request.setLastUpdateBy(rs.getString("last_update_by")); + request.setLastUpdateTime(rs.getDateTime("last_update_time")); + return request; + } + }, targetNodeId); + } public List getTableReloadRequests() { return sqlTemplateDirty.query(getSql("selectTableReloadRequests"), @@ -335,15 +359,35 @@ public List getTableReloadRequests() { } public List getTableReloadStatus() { - return sqlTemplateDirty.query(getSql("selectTableReloadStatus"), + return sqlTemplateDirty.query(getSql("selectTableReloadStatus", "orderTableReloadStatus"), new TableReloadStatusMapper()); } + public List getOutgoingTableReloadStatus() { + return sqlTemplateDirty.query(getSql("selectTableReloadStatus", "whereSourceNodeId", "orderTableReloadStatus"), + new TableReloadStatusMapper(), engine.getNodeId()); + } + + public List getIncomingTableReloadStatus() { + return sqlTemplateDirty.query(getSql("selectTableReloadStatus", "whereTargetNodeId", "orderTableReloadStatus"), + new TableReloadStatusMapper(), engine.getNodeId()); + } + public List getActiveTableReloadStatus() { - return sqlTemplateDirty.query(getSql("selectActiveTableReloadStatus"), + return sqlTemplateDirty.query(getSql("selectActiveTableReloadStatus", "orderTableReloadStatus"), new TableReloadStatusMapper()); } + public List getActiveOutgoingTableReloadStatus() { + return sqlTemplateDirty.query(getSql("selectActiveTableReloadStatus", "andSourceNodeId", "orderTableReloadStatus"), + new TableReloadStatusMapper(), engine.getNodeId()); + } + + public List getActiveIncomingTableReloadStatus() { + return sqlTemplateDirty.query(getSql("selectActiveTableReloadStatus", "andTargetNodeId", "orderTableReloadStatus"), + new TableReloadStatusMapper(), engine.getNodeId()); + } + public TableReloadStatus getTableReloadStatusByLoadId(long loadId) { return sqlTemplateDirty.queryForObject(getSql("selectTableReloadStatusByLoadId"), new TableReloadStatusMapper(), loadId); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java index 952837f887..6716b225a9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java @@ -40,6 +40,12 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace + " from $(table_reload_request) " + " where source_node_id=? and processed = 0 " + " order by create_time, target_node_id"); + putSql("selectTableReloadRequestToProcessByTarget", "select source_node_id, create_table, delete_first, reload_select, before_custom_sql, " + + " reload_time, channel_id, create_time, last_update_by, " + + " last_update_time, trigger_id, router_id, load_id " + + " from $(table_reload_request) " + + " where target_node_id=? and processed = 0 " + + " order by create_time, source_node_id"); putSql("selectTableReloadRequests", "select source_node_id, target_node_id, load_id, " + " trigger_id, router_id, create_time, create_table, delete_first, reload_select, " + " before_custom_sql, reload_time, processed, channel_id, last_update_by, last_update_time " @@ -69,8 +75,7 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace + " completed, cancelled, full_load, " + " start_time, end_time, last_update_time, last_update_by, " + " error_flag, sql_state, sql_code, sql_message " - + " from $(table_reload_status) " - + " order by load_id desc, completed, last_update_time desc"); + + " from $(table_reload_status) "); putSql("selectActiveTableReloadStatus", "select source_node_id, target_node_id, load_id, " + " end_data_batch_id, start_data_batch_id, " + " setup_batch_count, data_batch_count, finalize_batch_count, " @@ -80,8 +85,12 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace + " start_time, end_time, last_update_time, last_update_by, " + " error_flag, sql_state, sql_code, sql_message " + " from $(table_reload_status) " - + " where completed = 0 and cancelled = 0" - + " order by load_id desc, completed, last_update_time desc"); + + " where completed = 0 and cancelled = 0"); + putSql("orderTableReloadStatus", " order by load_id desc, completed, last_update_time desc"); + putSql("whereSourceNodeId", " where source_node_id = ?"); + putSql("whereTargetNodeId", " where target_node_id = ?"); + putSql("andSourceNodeId", " and source_node_id = ?"); + putSql("andTargetNodeId", " and target_node_id = ?"); putSql("selectTableReloadStatusByLoadId", "select source_node_id, target_node_id, load_id, " + " end_data_batch_id, start_data_batch_id, " + " setup_batch_count, data_batch_count, finalize_batch_count, "