From a4786a3afd522cf4e3b37e36087c2f322b5673bf Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Fri, 22 Jul 2016 14:43:03 -0400 Subject: [PATCH] 0002691: Partial initial loads --- .../db/AbstractSymmetricDialect.java | 21 +- .../symmetric/db/ISymmetricDialect.java | 2 + .../symmetric/model/TableReloadRequest.java | 36 ++ .../route/ConfigurationChangedDataRouter.java | 22 +- .../symmetric/service/IDataService.java | 4 + .../symmetric/service/impl/DataService.java | 218 +++++++-- .../service/impl/DataServiceSqlMap.java | 9 + .../symmetric/service/impl/RouterService.java | 454 ++++++++++-------- .../src/main/resources/symmetric-schema.xml | 9 +- 9 files changed, 485 insertions(+), 290 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java index 5c8642cd67..9124bd3421 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/AbstractSymmetricDialect.java @@ -229,7 +229,13 @@ public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerH @Override public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, List transforms) { - String sql = null; + return createPurgeSqlFor(node, triggerRouter, triggerHistory, transforms, null); + } + + @Override + public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, + List transforms, String deleteSql) { + String sql = null; if (StringUtils.isEmpty(triggerRouter.getInitialLoadDeleteStmt())) { List tableNames = new ArrayList(); if (transforms != null) { @@ -243,12 +249,13 @@ public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerH StringBuilder statements = new StringBuilder(128); for (String tableName : tableNames) { - String deleteSql = null; - if (tableName.startsWith(parameterService.getTablePrefix())) { - deleteSql = "delete from %s"; - } else { - deleteSql = parameterService.getString(ParameterConstants.INITIAL_LOAD_DELETE_FIRST_SQL); - } + if (deleteSql == null) { + if (tableName.startsWith(parameterService.getTablePrefix())) { + deleteSql = "delete from %s"; + } else { + deleteSql = parameterService.getString(ParameterConstants.INITIAL_LOAD_DELETE_FIRST_SQL); + } + } statements.append(String.format(deleteSql, tableName)).append(";"); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/ISymmetricDialect.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/ISymmetricDialect.java index cd78c1ab36..96c88dcb71 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/db/ISymmetricDialect.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/db/ISymmetricDialect.java @@ -95,6 +95,8 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, List transforms); + public String createPurgeSqlFor(Node node, TriggerRouter triggerRouter, TriggerHistory triggerHistory, List transforms, String deleteSql); + public String createCsvDataSql(Trigger trigger, TriggerHistory triggerHistory, Channel channel, String whereClause); public String createCsvPrimaryKeySql(Trigger trigger, TriggerHistory triggerHistory, Channel channel, String whereClause); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequest.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequest.java index d2ce651541..7aa4a6def9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequest.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/TableReloadRequest.java @@ -22,16 +22,21 @@ import java.util.Date; +import org.jumpmind.symmetric.common.ParameterConstants; + public class TableReloadRequest { protected String targetNodeId; protected String sourceNodeId; protected String triggerId; protected String routerId; + protected boolean createTable; + protected boolean deleteFirst; protected String reloadSelect; protected String reloadDeleteStmt; protected boolean reloadEnabled = true; protected Date reloadTime; + protected String channelId; protected Date createTime = new Date(); protected Date lastUpdateTime = new Date(); protected String lastUpdateBy; @@ -134,4 +139,35 @@ public void setLastUpdateBy(String lastUpdateBy) { this.lastUpdateBy = lastUpdateBy; } + public String getChannelId() { + return channelId; + } + + public void setChannelId(String channelId) { + this.channelId = channelId; + } + + public boolean isCreateTable() { + return createTable; + } + + public void setCreateTable(boolean createTable) { + this.createTable = createTable; + } + + public boolean isDeleteFirst() { + return deleteFirst; + } + + public void setDeleteFirst(boolean deleteFirst) { + this.deleteFirst = deleteFirst; + } + + public boolean isFullLoadRequest() { + return ParameterConstants.ALL.equals(getTriggerId()) && ParameterConstants.ALL.equals(getRouterId()); + } + + public String getIdentifier() { + return getTriggerId() + getRouterId(); + } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java index 4ad205f7c6..425660d10b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java @@ -542,8 +542,6 @@ public void contextCommitted(SimpleRouterContext routingContext) { engine.getLoadFilterService().clearCache(); } - insertReloadEvents(routingContext); - if (routingContext.get(CTX_KEY_RESTART_JOBMANAGER_NEEDED) != null) { IJobManager jobManager = engine.getJobManager(); if (jobManager != null) { @@ -569,25 +567,7 @@ public void contextCommitted(SimpleRouterContext routingContext) { } } } - - protected void insertReloadEvents(SimpleRouterContext routingContext) { - @SuppressWarnings("unchecked") - List reloadRequestKeys = (List) routingContext - .get(CTX_KEY_TABLE_RELOAD_NEEDED); - if (reloadRequestKeys != null) { - for (TableReloadRequestKey reloadRequestKey : reloadRequestKeys) { - TableReloadRequest request = engine.getDataService().getTableReloadRequest( - reloadRequestKey); - if (engine.getDataService().insertReloadEvent(request, - reloadRequestKey.getReceivedFromNodeId() != null)) { - log.info( - "Inserted table reload request from config data router for node {} and trigger {}", - reloadRequestKey.getTargetNodeId(), reloadRequestKey.getTriggerId()); - } - } - } - } - + private String tableName(String tableName) { return TableConstants.getTableName(engine != null ? engine.getTablePrefix() : "sym", tableName); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index 4b5ea6cbcb..5f1c5eb0d5 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -47,6 +47,8 @@ public interface IDataService { public TableReloadRequest getTableReloadRequest(TableReloadRequestKey key); + public List getTableReloadRequestToProcess(final String sourceNodeId); + public String reloadNode(String nodeId, boolean reverseLoad, String createBy); public String reloadTable(String nodeId, String catalogName, String schemaName, String tableName); @@ -67,6 +69,8 @@ public interface IDataService { public void insertReloadEvents(Node targetNode, boolean reverse); + public void insertReloadEvents(Node targetNode, boolean reverse, List reloadRequests); + public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient); public long insertReloadEvent(ISqlTransaction transaction, Node targetNode, diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 03b155a9fe..55d57ff5b7 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -260,6 +260,30 @@ public TableReloadRequest mapRow(Row rs) { }, key.getSourceNodeId(), key.getTargetNodeId(), key.getTriggerId(), key.getRouterId()); } + + public List getTableReloadRequestToProcess(final String sourceNodeId) { + return sqlTemplate.query(getSql("selectTableReloadRequestToProcess"), + new ISqlRowMapper() { + public TableReloadRequest mapRow(Row rs) { + TableReloadRequest request = new TableReloadRequest(); + request.setSourceNodeId(sourceNodeId); + request.setTargetNodeId(rs.getString("target_node_id")); + request.setCreateTable(rs.getBoolean("create_table")); + request.setDeleteFirst(rs.getBoolean("delete_first")); + request.setReloadSelect(rs.getString("reload_select")); + request.setReloadEnabled(rs.getBoolean("reload_enabled")); + request.setReloadTime(rs.getDateTime("reload_time")); + request.setReloadDeleteStmt(rs.getString("reload_delete_stmt")); + request.setChannelId(rs.getString("channel_id")); + request.setTriggerId(rs.getString("trigger_id")); + request.setRouterId(rs.getString("router_id")); + request.setCreateTime(rs.getDateTime("create_time")); + request.setLastUpdateBy(rs.getString("last_update_by")); + request.setLastUpdateTime(rs.getDateTime("last_update_time")); + return request; + } + }, sourceNodeId); + } /** * @return If isLoad then return the inserted batch id otherwise return the @@ -323,27 +347,36 @@ private String getReloadChannelIdForTrigger(Trigger trigger, Map reloadRequests) { if (engine.getClusterService().lock(ClusterConstants.SYNCTRIGGERS)) { try { synchronized (engine.getTriggerRouterService()) { engine.getClusterService().lock(ClusterConstants.SYNCTRIGGERS); + boolean isFullLoad = reloadRequests == null + || (reloadRequests.size() == 1 && reloadRequests.get(0).isFullLoadRequest()); + if (!reverse) { - log.info("Queueing up an initial load to " + targetNode.getNodeId()); + log.info("Queueing up " + (isFullLoad ? "an initial" : "a" + " load to ") + targetNode.getNodeId()); } else { - log.info("Queueing up a reverse initial load to " + targetNode.getNodeId()); + log.info("Queueing up a reverse " + (isFullLoad ? "initial" : "") + " load to " + targetNode.getNodeId()); } /* * Outgoing data events are pointless because we are * reloading all data */ - engine.getOutgoingBatchService().markAllAsSentForNode(targetNode.getNodeId(), - false); - + if (isFullLoad) { + engine.getOutgoingBatchService().markAllAsSentForNode(targetNode.getNodeId(), + false); + } + INodeService nodeService = engine.getNodeService(); ITriggerRouterService triggerRouterService = engine.getTriggerRouterService(); @@ -367,53 +400,65 @@ public void insertReloadEvents(Node targetNode, boolean reverse) { String createBy = reverse ? nodeSecurity.getRevInitialLoadCreateBy() : nodeSecurity.getInitialLoadCreateBy(); - List triggerHistories = triggerRouterService - .getActiveTriggerHistories(); + List triggerHistories = new ArrayList(); + if (isFullLoad) { + triggerHistories = triggerRouterService.getActiveTriggerHistories(); + } + else { + for (TableReloadRequest reloadRequest : reloadRequests) { + triggerHistories.addAll(engine.getTriggerRouterService() + .getActiveTriggerHistories(new Trigger(reloadRequest.getTriggerId(), null))); + } + } Map> triggerRoutersByHistoryId = triggerRouterService .fillTriggerRoutersByHistIdAndSortHist(sourceNode.getNodeGroupId(), targetNode.getNodeGroupId(), triggerHistories); - callReloadListeners(true, targetNode, transactional, transaction, loadId); + if (isFullLoad) { + callReloadListeners(true, targetNode, transactional, transaction, loadId); - insertCreateSchemaScriptPriorToReload(targetNode, nodeIdRecord, loadId, + insertCreateSchemaScriptPriorToReload(targetNode, nodeIdRecord, loadId, createBy, transactional, transaction); - insertSqlEventsPriorToReload(targetNode, nodeIdRecord, loadId, createBy, + insertSqlEventsPriorToReload(targetNode, nodeIdRecord, loadId, createBy, transactional, transaction, reverse); - + } + insertCreateBatchesForReload(targetNode, loadId, createBy, triggerHistories, triggerRoutersByHistoryId, transactional, - transaction); + transaction, convertReloadListToMap(reloadRequests)); insertDeleteBatchesForReload(targetNode, loadId, createBy, triggerHistories, triggerRoutersByHistoryId, transactional, - transaction); + transaction, convertReloadListToMap(reloadRequests)); insertLoadBatchesForReload(targetNode, loadId, createBy, triggerHistories, - triggerRoutersByHistoryId, transactional, transaction); - - String afterSql = parameterService - .getString(reverse ? ParameterConstants.INITIAL_LOAD_REVERSE_AFTER_SQL - : ParameterConstants.INITIAL_LOAD_AFTER_SQL); - if (isNotBlank(afterSql)) { - insertSqlEvent(transaction, targetNode, afterSql, true, loadId, - createBy); + triggerRoutersByHistoryId, transactional, transaction, reloadRequests); + + if (isFullLoad) { + String afterSql = parameterService + .getString(reverse ? ParameterConstants.INITIAL_LOAD_REVERSE_AFTER_SQL + : ParameterConstants.INITIAL_LOAD_AFTER_SQL); + if (isNotBlank(afterSql)) { + insertSqlEvent(transaction, targetNode, afterSql, true, loadId, + createBy); + } } - insertFileSyncBatchForReload(targetNode, loadId, createBy, transactional, transaction); - callReloadListeners(false, targetNode, transactional, transaction, loadId); - - if (!reverse) { - nodeService.setInitialLoadEnabled(transaction, nodeIdRecord, false, + if (isFullLoad) { + callReloadListeners(false, targetNode, transactional, transaction, loadId); + if (!reverse) { + nodeService.setInitialLoadEnabled(transaction, nodeIdRecord, false, false, loadId, createBy); - } else { - nodeService.setReverseInitialLoadEnabled(transaction, nodeIdRecord, - false, false, loadId, createBy); + } else { + nodeService.setReverseInitialLoadEnabled(transaction, nodeIdRecord, + false, false, loadId, createBy); + } } - + if (!Constants.DEPLOYMENT_TYPE_REST.equals(targetNode.getDeploymentType())) { insertNodeSecurityUpdate(transaction, nodeIdRecord, targetNode.getNodeId(), true, loadId, createBy); @@ -421,6 +466,12 @@ public void insertReloadEvents(Node targetNode, boolean reverse) { engine.getStatisticManager().incrementNodesLoaded(1); + if (reloadRequests != null && reloadRequests.size() > 0) { + log.info("About to mark table reload request for load id " + loadId + " to processed."); + transaction.prepareAndExecute(getSql("updateProcessedTableReloadRequest"), loadId, new Date()); + log.info("Table reload request for load id " + loadId + " marked processed."); + } + transaction.commit(); } catch (Error ex) { if (transaction != null) { @@ -454,6 +505,17 @@ public void insertReloadEvents(Node targetNode, boolean reverse) { } + protected Map convertReloadListToMap(List reloadRequests) { + if (reloadRequests == null) { + return null; + } + Map reloadMap = new HashMap(); + for (TableReloadRequest item : reloadRequests) { + reloadMap.put(item.getIdentifier(), item); + } + return reloadMap; + } + private void callReloadListeners(boolean before, Node targetNode, boolean transactional, ISqlTransaction transaction, long loadId) { for (IReloadListener listener : extensionService.getExtensionPointList(IReloadListener.class)) { @@ -548,13 +610,23 @@ private void insertSqlEventsPriorToReload(Node targetNode, String nodeIdRecord, private void insertCreateBatchesForReload(Node targetNode, long loadId, String createBy, List triggerHistories, Map> triggerRoutersByHistoryId, boolean transactional, - ISqlTransaction transaction) { - if (parameterService.is(ParameterConstants.INITIAL_LOAD_CREATE_SCHEMA_BEFORE_RELOAD)) { + ISqlTransaction transaction, Map reloadRequests) { + + if (reloadRequests != null && reloadRequests.size() > 0) { for (TriggerHistory triggerHistory : triggerHistories) { List triggerRouters = triggerRoutersByHistoryId.get(triggerHistory .getTriggerHistoryId()); + + TableReloadRequest currentRequest = reloadRequests.get(ParameterConstants.ALL + ParameterConstants.ALL); + boolean fullLoad = currentRequest == null ? false : true; + for (TriggerRouter triggerRouter : triggerRouters) { - if (triggerRouter.getInitialLoadOrder() >= 0 + if (!fullLoad) { + currentRequest = reloadRequests.get(triggerRouter.getTriggerId() + triggerRouter.getRouterId()); + } + + //Check the create flag on the specific table reload request + if (currentRequest != null && currentRequest.isCreateTable() && engine.getGroupletService().isTargetEnabled(triggerRouter, targetNode)) { insertCreateEvent(transaction, targetNode, triggerHistory, triggerRouter.getRouter().getRouterId(), true, @@ -566,29 +638,51 @@ private void insertCreateBatchesForReload(Node targetNode, long loadId, String c } } } + else { + if (parameterService.is(ParameterConstants.INITIAL_LOAD_CREATE_SCHEMA_BEFORE_RELOAD)) { + for (TriggerHistory triggerHistory : triggerHistories) { + List triggerRouters = triggerRoutersByHistoryId.get(triggerHistory + .getTriggerHistoryId()); + for (TriggerRouter triggerRouter : triggerRouters) { + if (triggerRouter.getInitialLoadOrder() >= 0 + && engine.getGroupletService().isTargetEnabled(triggerRouter, + targetNode)) { + insertCreateEvent(transaction, targetNode, triggerHistory, triggerRouter.getRouter().getRouterId(), true, + loadId, createBy); + if (!transactional) { + transaction.commit(); + } + } + } + } + } + } } private void insertDeleteBatchesForReload(Node targetNode, long loadId, String createBy, List triggerHistories, Map> triggerRoutersByHistoryId, boolean transactional, - ISqlTransaction transaction) { - if (parameterService.is(ParameterConstants.INITIAL_LOAD_DELETE_BEFORE_RELOAD)) { - for (ListIterator triggerHistoryIterator = triggerHistories - .listIterator(triggerHistories.size()); triggerHistoryIterator.hasPrevious();) { - TriggerHistory triggerHistory = triggerHistoryIterator.previous(); + ISqlTransaction transaction, Map reloadRequests) { + + if (reloadRequests != null && reloadRequests.size() > 0) { + for (TriggerHistory triggerHistory : triggerHistories) { List triggerRouters = triggerRoutersByHistoryId.get(triggerHistory .getTriggerHistoryId()); - for (ListIterator iterator = triggerRouters - .listIterator(triggerRouters.size()); iterator.hasPrevious();) { - TriggerRouter triggerRouter = iterator.previous(); - if (triggerRouter.getInitialLoadOrder() >= 0 + + TableReloadRequest currentRequest = reloadRequests.get(ParameterConstants.ALL + ParameterConstants.ALL); + boolean fullLoad = currentRequest == null ? false : true; + + for (TriggerRouter triggerRouter : triggerRouters) { + if (!fullLoad) { + currentRequest = reloadRequests.get(triggerRouter.getTriggerId() + triggerRouter.getRouterId()); + } + + //Check the delete flag on the specific table reload request + if (currentRequest != null && currentRequest.isDeleteFirst() && engine.getGroupletService().isTargetEnabled(triggerRouter, - targetNode) - && (!StringUtils.isBlank(parameterService - .getString(ParameterConstants.INITIAL_LOAD_DELETE_FIRST_SQL)) || !StringUtils - .isEmpty(triggerRouter.getInitialLoadDeleteStmt()))) { + targetNode)) { insertPurgeEvent(transaction, targetNode, triggerRouter, triggerHistory, - true, null, loadId, createBy); + true, currentRequest.getReloadDeleteStmt(), loadId, createBy); if (!transactional) { transaction.commit(); } @@ -596,12 +690,38 @@ private void insertDeleteBatchesForReload(Node targetNode, long loadId, String c } } } + else { + if (parameterService.is(ParameterConstants.INITIAL_LOAD_DELETE_BEFORE_RELOAD)) { + for (ListIterator triggerHistoryIterator = triggerHistories + .listIterator(triggerHistories.size()); triggerHistoryIterator.hasPrevious();) { + TriggerHistory triggerHistory = triggerHistoryIterator.previous(); + List triggerRouters = triggerRoutersByHistoryId.get(triggerHistory + .getTriggerHistoryId()); + for (ListIterator iterator = triggerRouters + .listIterator(triggerRouters.size()); iterator.hasPrevious();) { + TriggerRouter triggerRouter = iterator.previous(); + if (triggerRouter.getInitialLoadOrder() >= 0 + && engine.getGroupletService().isTargetEnabled(triggerRouter, + targetNode) + && (!StringUtils.isBlank(parameterService + .getString(ParameterConstants.INITIAL_LOAD_DELETE_FIRST_SQL)) || !StringUtils + .isEmpty(triggerRouter.getInitialLoadDeleteStmt()))) { + insertPurgeEvent(transaction, targetNode, triggerRouter, triggerHistory, + true, null, loadId, createBy); + if (!transactional) { + transaction.commit(); + } + } + } + } + } + } } private void insertLoadBatchesForReload(Node targetNode, long loadId, String createBy, List triggerHistories, Map> triggerRoutersByHistoryId, boolean transactional, - ISqlTransaction transaction) { + ISqlTransaction transaction, List reloadRequests) { Map channels = engine.getConfigurationService().getChannels(false); DatabaseInfo dbInfo = platform.getDatabaseInfo(); String quote = dbInfo.getDelimiterToken(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java index 73d8fd91d5..0344708b6b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java @@ -37,6 +37,15 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace putSql("deleteTableReloadRequest", "delete from $(table_reload_request) where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?"); + putSql("selectTableReloadRequestToProcess", "select target_node_id, create_table, delete_first, reload_select, reload_delete_stmt, " + + " reload_enabled, reload_time, channel_id, create_time, last_update_by, " + + " last_update_time, trigger_id, router_id " + + " from $(table_reload_request) " + + " where source_node_id=? and processed = 0 " + + " order by create_time, target_node_id"); + + putSql("updateProcessedTableReloadRequest", "update $(table_reload_request) set load_id = ?, last_update_time = ?, processed = 1"); + // Note that the order by data_id is done appended in code putSql("selectEventDataToExtractSql", "" diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index 05548a9d02..4bb67f3254 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -62,6 +63,7 @@ import org.jumpmind.symmetric.model.ProcessInfoKey; import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; import org.jumpmind.symmetric.model.Router; +import org.jumpmind.symmetric.model.TableReloadRequest; import org.jumpmind.symmetric.model.Trigger; import org.jumpmind.symmetric.model.TriggerHistory; import org.jumpmind.symmetric.model.TriggerRouter; @@ -101,21 +103,21 @@ public class RouterService extends AbstractService implements IRouterService { protected Map commonBatchesLastKnownState = new HashMap(); - + protected long commonBatchesCacheTime; protected Map defaultRouterOnlyLastKnownState = new HashMap(); - + protected long defaultRoutersCacheTime; protected transient ExecutorService readThread = null; protected ISymmetricEngine engine; - + protected IExtensionService extensionService; - + protected boolean syncTriggersBeforeInitialLoadAttempted = false; - + protected boolean firstTimeCheckForAbandonedBatches = true; public RouterService(ISymmetricEngine engine) { @@ -128,31 +130,31 @@ public RouterService(ISymmetricEngine engine) { extensionService.addExtensionPoint(NonTransactionalBatchAlgorithm.NAME, new NonTransactionalBatchAlgorithm()); extensionService.addExtensionPoint(TransactionalBatchAlgorithm.NAME, new TransactionalBatchAlgorithm()); - extensionService.addExtensionPoint(ConfigurationChangedDataRouter.ROUTER_TYPE, new ConfigurationChangedDataRouter(engine)); + extensionService.addExtensionPoint(ConfigurationChangedDataRouter.ROUTER_TYPE, + new ConfigurationChangedDataRouter(engine)); extensionService.addExtensionPoint("bsh", new BshDataRouter(engine)); extensionService.addExtensionPoint("subselect", new SubSelectDataRouter(symmetricDialect)); extensionService.addExtensionPoint("lookuptable", new LookupTableDataRouter(symmetricDialect)); extensionService.addExtensionPoint("default", new DefaultDataRouter()); extensionService.addExtensionPoint("audit", new AuditTableDataRouter(engine)); - extensionService.addExtensionPoint("column", new ColumnMatchDataRouter(engine.getConfigurationService(), - engine.getSymmetricDialect())); + extensionService.addExtensionPoint("column", + new ColumnMatchDataRouter(engine.getConfigurationService(), engine.getSymmetricDialect())); extensionService.addExtensionPoint(FileSyncDataRouter.ROUTER_TYPE, new FileSyncDataRouter(engine)); extensionService.addExtensionPoint("dbf", new DBFRouter(engine)); - setSqlMap(new RouterServiceSqlMap(symmetricDialect.getPlatform(), - createSqlReplacementTokens())); + setSqlMap(new RouterServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); } /** * For use in data load events */ - public boolean shouldDataBeRouted(SimpleRouterContext context, DataMetaData dataMetaData, - Node node, boolean initialLoad, boolean initialLoadSelectUsed, TriggerRouter triggerRouter) { + public boolean shouldDataBeRouted(SimpleRouterContext context, DataMetaData dataMetaData, Node node, + boolean initialLoad, boolean initialLoadSelectUsed, TriggerRouter triggerRouter) { IDataRouter router = getDataRouter(dataMetaData.getRouter()); Set oneNodeSet = new HashSet(1); oneNodeSet.add(node); - Collection nodeIds = router.routeToNodes(context, dataMetaData, oneNodeSet, - initialLoad, initialLoadSelectUsed, triggerRouter); + Collection nodeIds = router.routeToNodes(context, dataMetaData, oneNodeSet, initialLoad, + initialLoadSelectUsed, triggerRouter); return nodeIds != null && nodeIds.contains(node.getNodeId()); } @@ -181,19 +183,18 @@ synchronized public long routeData(boolean force) { engine.getOutgoingBatchService().updateAbandonedRoutingBatches(); firstTimeCheckForAbandonedBatches = false; } - + insertInitialLoadEvents(); - + long ts = System.currentTimeMillis(); DataGapDetector gapDetector = null; if (parameterService.is(ParameterConstants.ROUTING_USE_FAST_GAP_DETECTOR)) { - gapDetector = new DataGapFastDetector( - engine.getDataService(), parameterService, engine.getContextService(), symmetricDialect, - this, engine.getStatisticManager(), engine.getNodeService()); + gapDetector = new DataGapFastDetector(engine.getDataService(), parameterService, + engine.getContextService(), symmetricDialect, this, engine.getStatisticManager(), + engine.getNodeService()); } else { - gapDetector = new DataGapDetector( - engine.getDataService(), parameterService, symmetricDialect, - this, engine.getStatisticManager(), engine.getNodeService()); + gapDetector = new DataGapDetector(engine.getDataService(), parameterService, symmetricDialect, + this, engine.getStatisticManager(), engine.getNodeService()); } gapDetector.beforeRouting(); dataCount = routeDataForEachChannel(gapDetector); @@ -292,6 +293,8 @@ protected void insertInitialLoadEvents() { engine.getContextService().save(ContextConstants.ROUTING_FULL_GAP_ANALYSIS, "true"); } } + + processTableRequestLoads(identity); } } @@ -302,7 +305,63 @@ protected void insertInitialLoadEvents() { } } - + + public void processTableRequestLoads(Node source) { + List loadsToProcess = engine.getDataService().getTableReloadRequestToProcess(source.getNodeId()); + if (loadsToProcess.size() > 0) { + log.info("Found " + loadsToProcess.size() + " table reload requests to process."); + + Map> requestsSplitByLoad = new HashMap>(); + for (TableReloadRequest load : loadsToProcess) { + if (load.isFullLoadRequest() && isValidLoadTarget(load.getTargetNodeId())) { + List fullLoad = new ArrayList(); + fullLoad.add(load); + + engine.getDataService().insertReloadEvents( + engine.getNodeService().findNode(load.getTargetNodeId()), + false, fullLoad); + } + else { + NodeSecurity targetNodeSecurity = engine.getNodeService().findNodeSecurity(load.getTargetNodeId()); + boolean registered = targetNodeSecurity.getRegistrationTime() != null; + if (registered) { + // Make loads unique to the target and create time + String key = load.getTargetNodeId() + "::" + load.getCreateTime().toString(); + if (!requestsSplitByLoad.containsKey(key)) { + requestsSplitByLoad.put(key, new ArrayList()); + } + requestsSplitByLoad.get(key).add(load); + } + } + } + + for (Map.Entry> entry : requestsSplitByLoad.entrySet()) { + engine.getDataService().insertReloadEvents( + engine.getNodeService().findNode(entry.getKey().split("::")[0]), + false, entry.getValue()); + } + + + } + } + public boolean isValidLoadTarget(String targetNodeId) { + boolean result = false; + NodeSecurity targetNodeSecurity = engine.getNodeService().findNodeSecurity(targetNodeId); + + boolean reverseLoadFirst = parameterService.is(ParameterConstants.INITIAL_LOAD_REVERSE_FIRST); + boolean registered = targetNodeSecurity.getRegistrationTime() != null; + boolean reverseLoadQueued = targetNodeSecurity.isRevInitialLoadEnabled(); + + if (registered && (!reverseLoadFirst || !reverseLoadQueued)) { + result = true; + } else { + log.info("Unable to process load for target node id " + targetNodeId + " [registered: " + registered + + ", reverse load first: " + reverseLoadFirst + ", reverse load queued: " + reverseLoadQueued + + "]"); + } + return result; + } + public List findNodesThatAreReadyForInitialLoad() { INodeService nodeService = engine.getNodeService(); IConfigurationService configurationService = engine.getConfigurationService(); @@ -310,12 +369,9 @@ public List findNodesThatAreReadyForInitialLoad() { List toReturn = new ArrayList(); List securities = nodeService.findNodeSecurityWithLoadEnabled(); for (NodeSecurity nodeSecurity : securities) { - if (((!nodeSecurity.getNodeId().equals(me) - && nodeSecurity - .isInitialLoadEnabled()) - || (!nodeSecurity.getNodeId().equals(me) && configurationService - .isMasterToMaster()) || (nodeSecurity.getNodeId().equals(me) && nodeSecurity - .isRevInitialLoadEnabled()))) { + if (((!nodeSecurity.getNodeId().equals(me) && nodeSecurity.isInitialLoadEnabled()) + || (!nodeSecurity.getNodeId().equals(me) && configurationService.isMasterToMaster()) + || (nodeSecurity.getNodeId().equals(me) && nodeSecurity.isRevInitialLoadEnabled()))) { toReturn.add(nodeSecurity); } } @@ -334,8 +390,7 @@ protected void sendReverseInitialLoad() { } if (!queuedLoad) { - log.info("{} was enabled but no nodes were linked to load", - ParameterConstants.AUTO_RELOAD_REVERSE_ENABLED); + log.info("{} was enabled but no nodes were linked to load", ParameterConstants.AUTO_RELOAD_REVERSE_ENABLED); } } @@ -348,8 +403,8 @@ protected void sendReverseInitialLoad() { protected int routeDataForEachChannel(DataGapDetector gapDetector) { int dataCount = 0; Node sourceNode = engine.getNodeService().findIdentity(); - ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo( - new ProcessInfoKey(sourceNode.getNodeId(), null, ProcessType.ROUTER_JOB)); + ProcessInfo processInfo = engine.getStatisticManager() + .newProcessInfo(new ProcessInfoKey(sourceNode.getNodeId(), null, ProcessType.ROUTER_JOB)); processInfo.setStatus(ProcessInfo.Status.PROCESSING); try { final List channels = engine.getConfigurationService().getNodeChannels(false); @@ -358,17 +413,14 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) { readyChannels = getReadyChannels(gapDetector); } for (NodeChannel nodeChannel : channels) { - if (nodeChannel.isEnabled() && (readyChannels == null || readyChannels.contains(nodeChannel.getChannelId()))) { + if (nodeChannel.isEnabled() + && (readyChannels == null || readyChannels.contains(nodeChannel.getChannelId()))) { processInfo.setCurrentChannelId(nodeChannel.getChannelId()); - dataCount += routeDataForChannel(processInfo, - nodeChannel, - sourceNode - , gapDetector); + dataCount += routeDataForChannel(processInfo, nodeChannel, sourceNode, gapDetector); } else { gapDetector.setIsAllDataRead(false); if (log.isDebugEnabled()) { - log.debug( - "Not routing the {} channel. It is either disabled or suspended.", + log.debug("Not routing the {} channel. It is either disabled or suspended.", nodeChannel.getChannelId()); } } @@ -386,8 +438,8 @@ protected Set getReadyChannels(DataGapDetector gapDetector) { List dataGaps = gapDetector.getDataGaps(); int dataIdSqlType = engine.getSymmetricDialect().getSqlTypeForIds(); int numberOfGapsToQualify = parameterService.getInt(ParameterConstants.ROUTING_MAX_GAPS_TO_QUALIFY_IN_SQL, 100); - int maxGapsBeforeGreaterThanQuery = parameterService.getInt( - ParameterConstants.ROUTING_DATA_READER_THRESHOLD_GAPS_TO_USE_GREATER_QUERY, 100); + int maxGapsBeforeGreaterThanQuery = parameterService + .getInt(ParameterConstants.ROUTING_DATA_READER_THRESHOLD_GAPS_TO_USE_GREATER_QUERY, 100); String sql; Object[] args; int[] types; @@ -397,7 +449,7 @@ protected Set getReadyChannels(DataGapDetector gapDetector) { types = new int[] { dataIdSqlType }; } else { - sql = qualifyUsingDataGaps(dataGaps, numberOfGapsToQualify, getSql("selectChannelsUsingGapsSql")); + sql = qualifyUsingDataGaps(dataGaps, numberOfGapsToQualify, getSql("selectChannelsUsingGapsSql")); int numberOfArgs = 2 * (numberOfGapsToQualify < dataGaps.size() ? numberOfGapsToQualify : dataGaps.size()); args = new Object[numberOfArgs]; types = new int[numberOfArgs]; @@ -420,13 +472,12 @@ protected Set getReadyChannels(DataGapDetector gapDetector) { public String mapRow(Row row) { readyChannels.add(row.getString("channel_id")); return null; - } + } }, args, types); return readyChannels; } - protected String qualifyUsingDataGaps(List dataGaps, int numberOfGapsToQualify, - String sql) { + protected String qualifyUsingDataGaps(List dataGaps, int numberOfGapsToQualify, String sql) { StringBuilder gapClause = new StringBuilder(); for (int i = 0; i < numberOfGapsToQualify && i < dataGaps.size(); i++) { if (i == 0) { @@ -445,10 +496,8 @@ protected boolean producesCommonBatches(Channel channel, String nodeGroupId, Lis Boolean producesCommonBatches = commonBatchesLastKnownState.get(channelId); long cacheTime = parameterService.getLong(ParameterConstants.CACHE_CHANNEL_COMMON_BATCHES_IN_MS); if (producesCommonBatches == null || System.currentTimeMillis() - commonBatchesCacheTime > cacheTime) { - producesCommonBatches = !Constants.CHANNEL_CONFIG.equals(channelId) - && !channel.isFileSyncFlag() - && !channel.isReloadFlag() - && !Constants.CHANNEL_HEARTBEAT.equals(channelId) ? true : false; + producesCommonBatches = !Constants.CHANNEL_CONFIG.equals(channelId) && !channel.isFileSyncFlag() + && !channel.isReloadFlag() && !Constants.CHANNEL_HEARTBEAT.equals(channelId) ? true : false; if (producesCommonBatches && triggerRouters != null) { List testableTriggerRouters = new ArrayList(); for (TriggerRouter triggerRouter : triggerRouters) { @@ -456,36 +505,33 @@ protected boolean producesCommonBatches(Channel channel, String nodeGroupId, Lis testableTriggerRouters.add(triggerRouter); } else { /* - * Add any trigger router that is in another channel, but is - * for a table that is in the current channel + * Add any trigger router that is in another channel, + * but is for a table that is in the current channel */ - String anotherChannelTableName = triggerRouter.getTrigger() - .getFullyQualifiedSourceTableName(); + String anotherChannelTableName = triggerRouter.getTrigger().getFullyQualifiedSourceTableName(); for (TriggerRouter triggerRouter2 : triggerRouters) { - String currentTableName = triggerRouter2 - .getTrigger() - .getFullyQualifiedSourceTableName(); + String currentTableName = triggerRouter2.getTrigger().getFullyQualifiedSourceTableName(); String currentChannelId = triggerRouter2.getTrigger().getChannelId(); - if (anotherChannelTableName - .equals(currentTableName) && currentChannelId.equals(channelId)) { + if (anotherChannelTableName.equals(currentTableName) + && currentChannelId.equals(channelId)) { testableTriggerRouters.add(triggerRouter); } } } - } - + } + for (TriggerRouter triggerRouter : testableTriggerRouters) { boolean isDefaultRouter = "default".equals(triggerRouter.getRouter().getRouterType()); /* - * If the data router is not a default data router or there will - * be incoming data on the channel where sync_on_incoming_batch - * is on, then we can not do 'optimal' routing. When - * sync_on_incoming_batch is on, then we might not be sending - * data to all nodes in a node_group. We can only do 'optimal' - * routing if data is going to go to all nodes in a group. + * If the data router is not a default data router or there + * will be incoming data on the channel where + * sync_on_incoming_batch is on, then we can not do + * 'optimal' routing. When sync_on_incoming_batch is on, + * then we might not be sending data to all nodes in a + * node_group. We can only do 'optimal' routing if data is + * going to go to all nodes in a group. */ - if (triggerRouter.getRouter().getNodeGroupLink().getSourceNodeGroupId() - .equals(nodeGroupId)) { + if (triggerRouter.getRouter().getNodeGroupLink().getSourceNodeGroupId().equals(nodeGroupId)) { if (!isDefaultRouter) { producesCommonBatches = false; break; @@ -494,11 +540,11 @@ protected boolean producesCommonBatches(Channel channel, String nodeGroupId, Lis String outgoingTableName = triggerRouter.getTrigger() .getFullyQualifiedSourceTableName(); for (TriggerRouter triggerRouter2 : testableTriggerRouters) { - String incomingTableName = triggerRouter2.getTrigger().getFullyQualifiedSourceTableName(); + String incomingTableName = triggerRouter2.getTrigger() + .getFullyQualifiedSourceTableName(); String targetNodeGroupId = triggerRouter2.getRouter().getNodeGroupLink() .getTargetNodeGroupId(); - if (incomingTableName - .equals(outgoingTableName) + if (incomingTableName.equals(outgoingTableName) && targetNodeGroupId.equals(nodeGroupId)) { producesCommonBatches = false; break; @@ -509,7 +555,7 @@ protected boolean producesCommonBatches(Channel channel, String nodeGroupId, Lis } } } - + if (!producesCommonBatches.equals(commonBatchesLastKnownState.get(channelId))) { if (producesCommonBatches) { log.info("The '{}' channel is in common batch mode", channelId); @@ -522,30 +568,31 @@ protected boolean producesCommonBatches(Channel channel, String nodeGroupId, Lis } return producesCommonBatches; } - - protected boolean onlyDefaultRoutersAssigned(Channel channel, String nodeGroupId, List triggerRouters) { + + protected boolean onlyDefaultRoutersAssigned(Channel channel, String nodeGroupId, + List triggerRouters) { String channelId = channel.getChannelId(); Boolean onlyDefaultRoutersAssigned = defaultRouterOnlyLastKnownState.get(channelId); long cacheTime = parameterService.getLong(ParameterConstants.CACHE_CHANNEL_DEFAULT_ROUTER_IN_MS); if (onlyDefaultRoutersAssigned == null || System.currentTimeMillis() - defaultRoutersCacheTime > cacheTime) { - onlyDefaultRoutersAssigned = !Constants.CHANNEL_CONFIG.equals(channelId) - && !channel.isFileSyncFlag() - && !channel.isReloadFlag() - && !Constants.CHANNEL_HEARTBEAT.equals(channelId) ? true : false; - if (onlyDefaultRoutersAssigned && triggerRouters != null) { + onlyDefaultRoutersAssigned = !Constants.CHANNEL_CONFIG.equals(channelId) && !channel.isFileSyncFlag() + && !channel.isReloadFlag() && !Constants.CHANNEL_HEARTBEAT.equals(channelId) ? true : false; + if (onlyDefaultRoutersAssigned && triggerRouters != null) { for (TriggerRouter triggerRouter : triggerRouters) { - if (triggerRouter.getTrigger().getChannelId().equals(channel.getChannelId()) && - triggerRouter.getRouter().getNodeGroupLink().getSourceNodeGroupId() - .equals(nodeGroupId) && !"default".equals(triggerRouter.getRouter().getRouterType())) { + if (triggerRouter.getTrigger().getChannelId().equals(channel.getChannelId()) + && triggerRouter.getRouter().getNodeGroupLink().getSourceNodeGroupId().equals(nodeGroupId) + && !"default".equals(triggerRouter.getRouter().getRouterType())) { onlyDefaultRoutersAssigned = false; - } - } + } + } } - + if (!onlyDefaultRoutersAssigned.equals(defaultRouterOnlyLastKnownState.get(channelId))) { if (onlyDefaultRoutersAssigned) { - log.info("The '{}' channel for the '{}' node group has only default routers assigned to it. Change data won't be selected during routing", channelId, nodeGroupId); - } + log.info( + "The '{}' channel for the '{}' node group has only default routers assigned to it. Change data won't be selected during routing", + channelId, nodeGroupId); + } defaultRouterOnlyLastKnownState.put(channelId, onlyDefaultRoutersAssigned); } defaultRoutersCacheTime = System.currentTimeMillis(); @@ -560,11 +607,11 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod int dataCount = -1; try { List triggerRouters = engine.getTriggerRouterService().getTriggerRouters(false); - boolean producesCommonBatches = producesCommonBatches(nodeChannel.getChannel(), parameterService.getNodeGroupId(), - triggerRouters); + boolean producesCommonBatches = producesCommonBatches(nodeChannel.getChannel(), + parameterService.getNodeGroupId(), triggerRouters); boolean onlyDefaultRoutersAssigned = onlyDefaultRoutersAssigned(nodeChannel.getChannel(), parameterService.getNodeGroupId(), triggerRouters); - + context = new ChannelRouterContext(sourceNode.getNodeId(), nodeChannel, symmetricDialect.getPlatform().getSqlTemplate().startSqlTransaction()); context.setProduceCommonBatches(producesCommonBatches); @@ -574,7 +621,8 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod dataCount = selectDataAndRoute(processInfo, context); return dataCount; } catch (DelayRoutingException ex) { - log.info("The routing process for the {} channel is being delayed. {}", nodeChannel.getChannelId(), isNotBlank(ex.getMessage()) ? ex.getMessage() : ""); + log.info("The routing process for the {} channel is being delayed. {}", nodeChannel.getChannelId(), + isNotBlank(ex.getMessage()) ? ex.getMessage() : ""); if (context != null) { context.rollback(); } @@ -587,17 +635,15 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod return 0; } catch (SyntaxParsingException ex) { log.error( - String.format( - "Failed to route and batch data on '%s' channel due to an invalid router expression", - nodeChannel.getChannelId()), ex); + String.format("Failed to route and batch data on '%s' channel due to an invalid router expression", + nodeChannel.getChannelId()), + ex); if (context != null) { context.rollback(); } return 0; } catch (Throwable ex) { - log.error( - String.format("Failed to route and batch data on '%s' channel", - nodeChannel.getChannelId()), ex); + log.error(String.format("Failed to route and batch data on '%s' channel", nodeChannel.getChannelId()), ex); if (context != null) { context.rollback(); } @@ -606,12 +652,12 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod try { if (dataCount > 0) { long insertTs = System.currentTimeMillis(); - engine.getDataService().insertDataEvents(context.getSqlTransaction(), - context.getDataEventList()); + engine.getDataService().insertDataEvents(context.getSqlTransaction(), context.getDataEventList()); context.clearDataEventsList(); completeBatchesAndCommit(context); gapDetector.addDataIds(context.getDataIds()); - gapDetector.setIsAllDataRead(context.getDataIds().size() < context.getChannel().getMaxDataToRoute()); + gapDetector + .setIsAllDataRead(context.getDataIds().size() < context.getChannel().getMaxDataToRoute()); context.incrementStat(System.currentTimeMillis() - insertTs, ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS); @@ -620,9 +666,8 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod if (lastDataProcessed != null && lastDataProcessed.getDataId() > 0) { String channelId = nodeChannel.getChannelId(); long queryTs = System.currentTimeMillis(); - long dataLeftToRoute = sqlTemplate.queryForInt( - getSql("selectUnroutedCountForChannelSql"), channelId, - lastDataProcessed.getDataId()); + long dataLeftToRoute = sqlTemplate.queryForInt(getSql("selectUnroutedCountForChannelSql"), + channelId, lastDataProcessed.getDataId()); queryTs = System.currentTimeMillis() - queryTs; if (queryTs > Constants.LONG_OPERATION_THRESHOLD) { log.warn("Unrouted query for channel {} took longer than expected", channelId, queryTs); @@ -648,8 +693,7 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod protected void completeBatchesAndCommit(ChannelRouterContext context) { Set usedRouters = new HashSet(context.getUsedDataRouters()); - List batches = new ArrayList(context.getBatchesByNodes() - .values()); + List batches = new ArrayList(context.getBatchesByNodes().values()); if (!engine.getContextService().is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)) { engine.getContextService().save(ContextConstants.ROUTING_FULL_GAP_ANALYSIS, "true"); @@ -657,9 +701,9 @@ protected void completeBatchesAndCommit(ChannelRouterContext context) { context.commit(); if (engine.getParameterService().is(ParameterConstants.ROUTING_LOG_STATS_ON_BATCH_ERROR)) { - engine.getStatisticManager().addRouterStats(context.getStartDataId(), context.getEndDataId(), - context.getDataReadCount(), context.getPeekAheadFillCount(), - context.getDataGaps(), context.getTransactions(), batches); + engine.getStatisticManager().addRouterStats(context.getStartDataId(), context.getEndDataId(), + context.getDataReadCount(), context.getPeekAheadFillCount(), context.getDataGaps(), + context.getTransactions(), batches); } for (OutgoingBatch batch : batches) { @@ -688,52 +732,50 @@ protected Set findAvailableNodes(TriggerRouter triggerRouter, ChannelRoute nodes = new HashSet(); Router router = triggerRouter.getRouter(); NodeGroupLink link = engine.getConfigurationService().getNodeGroupLinkFor( - router.getNodeGroupLink().getSourceNodeGroupId(), - router.getNodeGroupLink().getTargetNodeGroupId(), false); + router.getNodeGroupLink().getSourceNodeGroupId(), router.getNodeGroupLink().getTargetNodeGroupId(), + false); if (link != null) { - nodes.addAll(engine.getNodeService().findEnabledNodesFromNodeGroup( - router.getNodeGroupLink().getTargetNodeGroupId())); + nodes.addAll(engine.getNodeService() + .findEnabledNodesFromNodeGroup(router.getNodeGroupLink().getTargetNodeGroupId())); } else { log.error("The router {} has no node group link configured from {} to {}", - new Object[] { router.getRouterId(), - router.getNodeGroupLink().getSourceNodeGroupId(), + new Object[] { router.getRouterId(), router.getNodeGroupLink().getSourceNodeGroupId(), router.getNodeGroupLink().getTargetNodeGroupId() }); } context.getAvailableNodes().put(triggerRouter, nodes); } - + return engine.getGroupletService().getTargetEnabled(triggerRouter, nodes); } - protected IDataToRouteReader startReading(ChannelRouterContext context) { - IDataToRouteReader reader = new DataGapRouteReader(context, engine); - if (parameterService.is(ParameterConstants.SYNCHRONIZE_ALL_JOBS)) { - reader.run(); - } else { - if (readThread == null) { - readThread = Executors.newCachedThreadPool(new ThreadFactory() { - final AtomicInteger threadNumber = new AtomicInteger(1); - final String namePrefix = parameterService.getEngineName() - .toLowerCase() + "-router-reader-"; - - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setName(namePrefix + threadNumber.getAndIncrement()); - if (t.isDaemon()) { - t.setDaemon(false); - } - if (t.getPriority() != Thread.NORM_PRIORITY) { - t.setPriority(Thread.NORM_PRIORITY); - } - return t; - } - }); - } - readThread.execute(reader); - } - - return reader; - } + protected IDataToRouteReader startReading(ChannelRouterContext context) { + IDataToRouteReader reader = new DataGapRouteReader(context, engine); + if (parameterService.is(ParameterConstants.SYNCHRONIZE_ALL_JOBS)) { + reader.run(); + } else { + if (readThread == null) { + readThread = Executors.newCachedThreadPool(new ThreadFactory() { + final AtomicInteger threadNumber = new AtomicInteger(1); + final String namePrefix = parameterService.getEngineName().toLowerCase() + "-router-reader-"; + + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setName(namePrefix + threadNumber.getAndIncrement()); + if (t.isDaemon()) { + t.setDaemon(false); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + }); + } + readThread.execute(reader); + } + + return reader; + } /** * Pre-read data and fill up a queue so we can peek ahead to see if we have @@ -746,7 +788,8 @@ public Thread newThread(Runnable r) { * @param context * The current context of the routing process */ - protected int selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext context) throws InterruptedException { + protected int selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext context) + throws InterruptedException { IDataToRouteReader reader = startReading(context); Data data = null; Data nextData = null; @@ -768,8 +811,7 @@ protected int selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext c boolean atTransactionBoundary = false; if (nextData != null) { String nextTxId = nextData.getTransactionId(); - atTransactionBoundary = nextTxId == null - || !nextTxId.equals(data.getTransactionId()); + atTransactionBoundary = nextTxId == null || !nextTxId.equals(data.getTransactionId()); } context.setEncountedTransactionBoundary(atTransactionBoundary); statsDataCount++; @@ -781,8 +823,8 @@ protected int selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext c try { if (maxNumberOfEventsBeforeFlush <= context.getDataEventList().size() || context.isNeedsCommitted()) { - engine.getDataService().insertDataEvents( - context.getSqlTransaction(), context.getDataEventList()); + engine.getDataService().insertDataEvents(context.getSqlTransaction(), + context.getDataEventList()); context.clearDataEventsList(); } if (context.isNeedsCommitted()) { @@ -793,8 +835,8 @@ protected int selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext c ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS); if (statsDataCount > StatisticConstants.FLUSH_SIZE_ROUTER_DATA) { - engine.getStatisticManager().incrementDataRouted( - context.getChannel().getChannelId(), statsDataCount); + engine.getStatisticManager().incrementDataRouted(context.getChannel().getChannelId(), + statsDataCount); statsDataCount = 0; engine.getStatisticManager().incrementDataEventInserted( context.getChannel().getChannelId(), statsDataEventCount); @@ -812,12 +854,11 @@ protected int selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext c } finally { reader.setReading(false); if (statsDataCount > 0) { - engine.getStatisticManager().incrementDataRouted( - context.getChannel().getChannelId(), statsDataCount); + engine.getStatisticManager().incrementDataRouted(context.getChannel().getChannelId(), statsDataCount); } if (statsDataEventCount > 0) { - engine.getStatisticManager().incrementDataEventInserted( - context.getChannel().getChannelId(), statsDataEventCount); + engine.getStatisticManager().incrementDataEventInserted(context.getChannel().getChannelId(), + statsDataEventCount); } } context.incrementStat(totalDataCount, ChannelRouterContext.STAT_DATA_ROUTED_COUNT); @@ -831,25 +872,26 @@ protected int routeData(ProcessInfo processInfo, Data data, ChannelRouterContext List triggerRouters = getTriggerRoutersForData(data); Table table = symmetricDialect.getTable(data.getTriggerHistory(), true); if (table == null) { - table = buildTableFromTriggerHistory(data.getTriggerHistory()); + table = buildTableFromTriggerHistory(data.getTriggerHistory()); } if (triggerRouters != null && triggerRouters.size() > 0) { for (TriggerRouter triggerRouter : triggerRouters) { DataMetaData dataMetaData = new DataMetaData(data, table, triggerRouter.getRouter(), context.getChannel()); Collection nodeIds = null; - if (!context.getChannel().isIgnoreEnabled() - && triggerRouter.isRouted(data.getDataEventType())) { + if (!context.getChannel().isIgnoreEnabled() && triggerRouter.isRouted(data.getDataEventType())) { String targetNodeIds = data.getNodeList(); if (StringUtils.isNotBlank(targetNodeIds)) { List targetNodeIdsList = Arrays.asList(targetNodeIds.split(",")); - nodeIds = CollectionUtils.intersection(targetNodeIdsList, toNodeIds(findAvailableNodes(triggerRouter, context))); + nodeIds = CollectionUtils.intersection(targetNodeIdsList, + toNodeIds(findAvailableNodes(triggerRouter, context))); if (nodeIds.size() == 0) { log.info( "None of the target nodes specified in the data.node_list field ({}) were qualified nodes. {} will not be routed using the {} router", - new Object[] {targetNodeIds, data.getDataId(), triggerRouter.getRouter().getRouterId() }); + new Object[] { targetNodeIds, data.getDataId(), + triggerRouter.getRouter().getRouterId() }); } } else { try { @@ -857,15 +899,13 @@ protected int routeData(ProcessInfo processInfo, Data data, ChannelRouterContext context.addUsedDataRouter(dataRouter); long ts = System.currentTimeMillis(); nodeIds = dataRouter.routeToNodes(context, dataMetaData, - findAvailableNodes(triggerRouter, context), false, false, - triggerRouter); + findAvailableNodes(triggerRouter, context), false, false, triggerRouter); context.incrementStat(System.currentTimeMillis() - ts, ChannelRouterContext.STAT_DATA_ROUTER_MS); } catch (DelayRoutingException ex) { throw ex; } catch (RuntimeException ex) { - StringBuilder failureMessage = new StringBuilder( - "Failed to route data: "); + StringBuilder failureMessage = new StringBuilder("Failed to route data: "); failureMessage.append(data.getDataId()); failureMessage.append(" for table: "); failureMessage.append(data.getTableName()); @@ -894,16 +934,13 @@ protected int routeData(ProcessInfo processInfo, Data data, ChannelRouterContext log.warn( "Could not find trigger routers for trigger history id of {}. There is a good chance that data was captured and the trigger router link was removed before the data could be routed", data.getTriggerHistory().getTriggerHistoryId()); - log.info( - "Data with the id of {} will be assigned to an unrouted batch", - data.getDataId()); - numberOfDataEventsInserted += insertDataEvents(processInfo, context, new DataMetaData(data, table, - null, context.getChannel()), new HashSet(0)); + log.info("Data with the id of {} will be assigned to an unrouted batch", data.getDataId()); + numberOfDataEventsInserted += insertDataEvents(processInfo, context, + new DataMetaData(data, table, null, context.getChannel()), new HashSet(0)); } - context.incrementStat(numberOfDataEventsInserted, - ChannelRouterContext.STAT_DATA_EVENTS_INSERTED); + context.incrementStat(numberOfDataEventsInserted, ChannelRouterContext.STAT_DATA_EVENTS_INSERTED); return numberOfDataEventsInserted; } @@ -923,15 +960,13 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con Map batches = context.getBatchesByNodes(); OutgoingBatch batch = batches.get(nodeId); if (batch == null) { - batch = new OutgoingBatch(nodeId, dataMetaData.getNodeChannel().getChannelId(), - Status.RT); + batch = new OutgoingBatch(nodeId, dataMetaData.getNodeChannel().getChannelId(), Status.RT); batch.setBatchId(batchIdToReuse); batch.setCommonFlag(context.isProduceCommonBatches()); - - log.debug( - "About to insert a new batch for node {} on the '{}' channel. Batches in progress are: {}.", - new Object[] { nodeId, batch.getChannelId(), - context.getBatchesByNodes().values() }); + + log.debug( + "About to insert a new batch for node {} on the '{}' channel. Batches in progress are: {}.", + new Object[] { nodeId, batch.getChannelId(), context.getBatchesByNodes().values() }); engine.getOutgoingBatchService().insertOutgoingBatch(batch); processInfo.incrementBatchCount(); @@ -942,7 +977,7 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con batchIdToReuse = batch.getBatchId(); } } - + if (dataMetaData.getData().getDataEventType() == DataEventType.RELOAD) { long loadId = context.getLastLoadId(); if (loadId < 0) { @@ -956,24 +991,22 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con batch.incrementEventCount(dataMetaData.getData().getDataEventType()); batch.incrementDataEventCount(); - if (!context.isProduceCommonBatches() - || (context.isProduceCommonBatches() && !dataEventAdded)) { + if (!context.isProduceCommonBatches() || (context.isProduceCommonBatches() && !dataEventAdded)) { Router router = dataMetaData.getRouter(); context.addDataEvent(dataMetaData.getData().getDataId(), batch.getBatchId(), - router != null ? router.getRouterId() - : Constants.UNKNOWN_ROUTER_ID); + router != null ? router.getRouterId() : Constants.UNKNOWN_ROUTER_ID); numberOfDataEventsInserted++; dataEventAdded = true; } - Map batchAlgorithms = extensionService.getExtensionPointMap(IBatchAlgorithm.class); - if (batchAlgorithms.get(context.getChannel().getBatchAlgorithm()).isBatchComplete( - batch, dataMetaData, context)) { + Map batchAlgorithms = extensionService + .getExtensionPointMap(IBatchAlgorithm.class); + if (batchAlgorithms.get(context.getChannel().getBatchAlgorithm()).isBatchComplete(batch, dataMetaData, + context)) { context.setNeedsCommitted(true); } } } - context.incrementStat(System.currentTimeMillis() - ts, - ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS); + context.incrementStat(System.currentTimeMillis() - ts, ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS); return numberOfDataEventsInserted; } @@ -983,8 +1016,7 @@ protected IDataRouter getDataRouter(Router router) { if (!StringUtils.isBlank(router.getRouterType())) { dataRouter = routers.get(router.getRouterType()); if (dataRouter == null) { - log.warn( - "Could not find configured router type of {} with the id of {}. Defaulting the router", + log.warn("Could not find configured router type of {} with the id of {}. Defaulting the router", router.getRouterType(), router.getRouterId()); } } @@ -999,20 +1031,20 @@ protected List getTriggerRoutersForData(Data data) { List triggerRouters = null; if (data != null) { if (data.getTriggerHistory() != null) { - triggerRouters = engine.getTriggerRouterService() - .getTriggerRoutersForCurrentNode(false) + triggerRouters = engine.getTriggerRouterService().getTriggerRoutersForCurrentNode(false) .get((data.getTriggerHistory().getTriggerId())); - if (triggerRouters == null && data.getTriggerHistory().getTriggerId() != null && data.getTriggerHistory().getTriggerId().equals(AbstractFileParsingRouter.TRIGGER_ID_FILE_PARSER)) { - TriggerRouter dynamicTriggerRouter = new TriggerRouter(); - dynamicTriggerRouter.setRouter(engine.getTriggerRouterService().getRouterById(data.getExternalData())); - dynamicTriggerRouter.setTrigger(new Trigger()); - triggerRouters = new ArrayList(); - triggerRouters.add(dynamicTriggerRouter); - data.setDataEventType(DataEventType.INSERT); + if (triggerRouters == null && data.getTriggerHistory().getTriggerId() != null && data + .getTriggerHistory().getTriggerId().equals(AbstractFileParsingRouter.TRIGGER_ID_FILE_PARSER)) { + TriggerRouter dynamicTriggerRouter = new TriggerRouter(); + dynamicTriggerRouter + .setRouter(engine.getTriggerRouterService().getRouterById(data.getExternalData())); + dynamicTriggerRouter.setTrigger(new Trigger()); + triggerRouters = new ArrayList(); + triggerRouters.add(dynamicTriggerRouter); + data.setDataEventType(DataEventType.INSERT); } if (triggerRouters == null || triggerRouters.size() == 0) { - triggerRouters = engine.getTriggerRouterService() - .getTriggerRoutersForCurrentNode(true) + triggerRouters = engine.getTriggerRouterService().getTriggerRoutersForCurrentNode(true) .get((data.getTriggerHistory().getTriggerId())); } } else { @@ -1025,11 +1057,10 @@ protected List getTriggerRoutersForData(Data data) { } public long getUnroutedDataCount() { - long maxDataIdAlreadyRouted = sqlTemplate - .queryForLong(getSql("selectLastDataIdRoutedUsingDataGapSql")); + long maxDataIdAlreadyRouted = sqlTemplate.queryForLong(getSql("selectLastDataIdRoutedUsingDataGapSql")); long leftToRoute = engine.getDataService().findMaxDataId() - maxDataIdAlreadyRouted; List gaps = engine.getDataService().findDataGapsByStatus(DataGap.Status.GP); - for (int i = 0; i < gaps.size()-2; i++) { + for (int i = 0; i < gaps.size() - 2; i++) { DataGap gap = gaps.get(i); leftToRoute += (gap.getEndId() - gap.getStartId()); } @@ -1049,11 +1080,12 @@ public Map getRouters() { } protected Table buildTableFromTriggerHistory(TriggerHistory triggerHistory) { - Table table = new Table(triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(), triggerHistory.getSourceTableName()); - String[] columnNames = triggerHistory.getColumnNames().split(","); - for (String columnName : columnNames) { - table.addColumn(new Column(columnName)); - } - return table; + Table table = new Table(triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(), + triggerHistory.getSourceTableName()); + String[] columnNames = triggerHistory.getColumnNames().split(","); + for (String columnName : columnNames) { + table.addColumn(new Column(columnName)); + } + return table; } } diff --git a/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric-core/src/main/resources/symmetric-schema.xml index 167d2dcfe3..fd10f13ea1 100644 --- a/symmetric-core/src/main/resources/symmetric-schema.xml +++ b/symmetric-core/src/main/resources/symmetric-schema.xml @@ -690,13 +690,18 @@ + + - + + + + - +