From 1848bbd244bf2340600fb77751a1f088588fc0ec Mon Sep 17 00:00:00 2001 From: Eric Long Date: Wed, 2 Oct 2019 14:44:05 -0400 Subject: [PATCH] 0004097: use current transaction, single data_event, cache sorting tables, query for node list option, log stats --- .../route/ConvertToReloadRouter.java | 167 +++++++++++------- 1 file changed, 105 insertions(+), 62 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConvertToReloadRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConvertToReloadRouter.java index 9b77649186..3b723e5e70 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConvertToReloadRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConvertToReloadRouter.java @@ -38,9 +38,9 @@ import org.jumpmind.db.model.Database; import org.jumpmind.db.model.Table; import org.jumpmind.db.platform.DatabaseNamesConstants; -import org.jumpmind.db.sql.ISqlTemplate; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.UniqueKeyException; +import org.jumpmind.db.sql.mapper.StringMapper; import org.jumpmind.extension.IBuiltInExtensionPoint; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.Constants; @@ -64,6 +64,8 @@ public class ConvertToReloadRouter extends AbstractDataRouter implements IDataRo private final static String ROUTERS = "c2rRouters"; + private final static String SORTED_TABLES = "c2rSortedTables"; + private final static String INSERT_DATA_SQL = "insert into sym_data " + "(data_id, table_name, event_type, row_data, trigger_hist_id, channel_id, node_list, create_time) values (null, ?, ?, ?, ?, ?, ?, ?)"; @@ -75,7 +77,9 @@ public class ConvertToReloadRouter extends AbstractDataRouter implements IDataRo protected ISymmetricEngine engine; - protected static boolean firstTime = true; + protected boolean firstTime = true; + + protected long routeMs, sortMs, insertTempMs, queryNodesMs, insertBatchMs; public ConvertToReloadRouter(ISymmetricEngine engine) { this.engine = engine; @@ -89,6 +93,7 @@ public Set routeToNodes(SimpleRouterContext context, DataMetaData dataMe return toNodeIds(nodes, null); } + long ts = System.currentTimeMillis(); @SuppressWarnings("unchecked") Map routers = (Map) context.get(ROUTERS); if (routers == null) { @@ -111,6 +116,7 @@ public Set routeToNodes(SimpleRouterContext context, DataMetaData dataMe return toNodeIds(nodes, null); } + routeMs += (System.currentTimeMillis() - ts); return null; } @@ -131,57 +137,31 @@ protected Object[] getPkObjects(DataEventType eventType, DataMetaData dataMetaDa public void completeBatch(SimpleRouterContext context, OutgoingBatch batch) { log.debug("Completing batch {}", batch.getBatchId()); if (batch.getNodeId().equals(Constants.UNROUTED_NODE_ID)) { - ISqlTemplate sqlTemplate = engine.getSqlTemplate(); - ISqlTransaction transaction = null; - try { - transaction = sqlTemplate.startSqlTransaction(); - - @SuppressWarnings("unchecked") - Map routers = (Map) context.get(ROUTERS); - List tableInfos = new ArrayList(); - for (RouterInfo routerInfo : routers.values()) { - tableInfos.addAll(routerInfo.getTableInfos()); - } - tableInfos = sortTableInfos(tableInfos); - - queueEvents((ChannelRouterContext) context, transaction, batch, tableInfos); - } catch (Error ex) { - if (transaction != null) { - transaction.rollback(); - } - throw ex; - } catch (RuntimeException ex) { - if (transaction != null) { - transaction.rollback(); - } - throw ex; - } finally { - if (transaction != null) { - transaction.close(); - } + @SuppressWarnings("unchecked") + Map routers = (Map) context.get(ROUTERS); + List tableInfos = new ArrayList(); + for (RouterInfo routerInfo : routers.values()) { + tableInfos.addAll(routerInfo.getTableInfos()); } - transaction.commit(); + tableInfos = sortTableInfos(context, tableInfos); + + ChannelRouterContext channelContext = ((ChannelRouterContext) context); + queueEvents(channelContext, channelContext.getSqlTransaction(), batch, tableInfos); } } - protected List sortTableInfos(Collection tableInfos) { - List histories = null; - if (firstTime) { - histories = engine.getTriggerRouterService().getActiveTriggerHistories(); - firstTime = false; - } else { - histories = engine.getTriggerRouterService().getActiveTriggerHistoriesFromCache(); - } - - List allTables = new ArrayList
(histories.size()); - for (TriggerHistory history : histories) { - Table table = engine.getDatabasePlatform().getTableFromCache(history.getSourceCatalogName(), - history.getSourceSchemaName(), history.getSourceTableName(), false); - if (table != null) { - allTables.add(table); - } + @Override + public void contextCommitted(SimpleRouterContext context) { + if (routeMs + sortMs + insertTempMs + queryNodesMs + insertBatchMs > 60000) { + log.info("{} router millis are route={}, sort={}, insert.temp={}, query.nodes={}, insert.batches={}", + ROUTER_ID, routeMs, sortMs, insertTempMs, queryNodesMs, insertBatchMs); } - List
sortedTables = Database.sortByForeignKeys(allTables); + routeMs = sortMs = insertTempMs = queryNodesMs = insertBatchMs = 0; + } + + protected List sortTableInfos(SimpleRouterContext context, Collection tableInfos) { + long ts = System.currentTimeMillis(); + List
sortedTables = getAllSortedTables(context); Map tableInfosByTable = new HashMap(); for (TableInfo tableInfo : tableInfos) { @@ -196,13 +176,42 @@ protected List sortTableInfos(Collection tableInfos) { } } + sortMs += (System.currentTimeMillis() - ts); return sortedTableInfos; } + + protected List
getAllSortedTables(SimpleRouterContext context) { + @SuppressWarnings("unchecked") + List
sortedTables = (List
) context.get(SORTED_TABLES); + + if (sortedTables == null) { + List histories = null; + if (firstTime) { + histories = engine.getTriggerRouterService().getActiveTriggerHistories(); + firstTime = false; + } else { + histories = engine.getTriggerRouterService().getActiveTriggerHistoriesFromCache(); + } + + List
allTables = new ArrayList
(histories.size()); + for (TriggerHistory history : histories) { + Table table = engine.getDatabasePlatform().getTableFromCache(history.getSourceCatalogName(), + history.getSourceSchemaName(), history.getSourceTableName(), false); + if (table != null) { + allTables.add(table); + } + } + sortedTables = Database.sortByForeignKeys(allTables); + } + return sortedTables; + } protected void queueEvents(ChannelRouterContext context, ISqlTransaction transaction, OutgoingBatch origBatch, List tableInfos) { final long loadId = engine.getSequenceService().nextVal(transaction, Constants.SEQUENCE_OUTGOING_BATCH_LOAD_ID); final int typeForId = engine.getSymmetricDialect().getSqlTypeForIds(); + boolean isPostgres = engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.POSTGRESQL); + long ts = System.currentTimeMillis(); for (TableInfo tableInfo : tableInfos) { RouterInfo routerInfo = tableInfo.getRouterInfo(); @@ -210,7 +219,7 @@ protected void queueEvents(ChannelRouterContext context, ISqlTransaction transac String tempSql = "insert into " + routerInfo.getTempTableName() + "(" + tableInfo.getPkColumnNamesAsString() + ", " + " load_id) values (" + placeHolders + ")"; - if (engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.POSTGRESQL)) { + if (isPostgres) { tempSql += " on conflict do nothing"; } transaction.prepare(tempSql); @@ -225,21 +234,38 @@ protected void queueEvents(ChannelRouterContext context, ISqlTransaction transac } } } + insertTempMs += (System.currentTimeMillis() - ts); Map batchByNode = new HashMap(); for (TableInfo tableInfo : tableInfos) { RouterInfo routerInfo = tableInfo.getRouterInfo(); String reloadSql = getTempTableSql(routerInfo, tableInfo, loadId); - for (Node node : routerInfo.getNodes()) { - OutgoingBatch batch = batchByNode.get(node.getNodeId()); + + List nodes = null; + if (StringUtils.isNotBlank(routerInfo.getNodeQuery())) { + ts = System.currentTimeMillis(); + transaction.flush(); + nodes = transaction.query(routerInfo.getNodeQuery(), new StringMapper(), new Object[] { loadId }, new int[] { typeForId }); + queryNodesMs += (System.currentTimeMillis() - ts); + } + if (nodes == null || nodes.size() == 0) { + nodes = routerInfo.getNodeIds(); + } + + ts = System.currentTimeMillis(); + long dataId = insertData(transaction, tableInfo, DataEventType.RELOAD.getCode(), reloadSql); + String tableName = tableInfo.getTableName().toLowerCase(); + for (String nodeId : nodes) { + OutgoingBatch batch = batchByNode.get(nodeId); if (batch == null) { - batch = newBatch(transaction, node.getNodeId(), loadId, tableInfo, origBatch.getSummary()); - batchByNode.put(node.getNodeId(), batch); + batch = newBatch(transaction, nodeId, loadId, tableInfo, origBatch.getSummary()); + batchByNode.put(nodeId, batch); } - batch.incrementTableCount(tableInfo.getTableName().toLowerCase()); - long dataId = insertDataForBatch(transaction, tableInfo, batch.getBatchId(), DataEventType.RELOAD.getCode(), reloadSql); + batch.incrementTableCount(tableName); + insertDataEvent(transaction, tableInfo, batch.getBatchId(), dataId); context.getDataIds().add(dataId); } + insertBatchMs += (System.currentTimeMillis() - ts); } origBatch.setLoadId(loadId); } @@ -270,7 +296,7 @@ protected OutgoingBatch newBatch(ISqlTransaction transaction, String nodeId, lon return batch; } - protected long insertDataForBatch(ISqlTransaction transaction, TableInfo tableInfo, long batchId, String eventType, String sql) { + protected long insertData(ISqlTransaction transaction, TableInfo tableInfo, String eventType, String sql) { Timestamp now = new Timestamp(System.currentTimeMillis()); long dataId = transaction.insertWithGeneratedKey(INSERT_DATA_SQL, engine.getSymmetricDialect().getSequenceKeyName(SequenceIdentifier.DATA), @@ -278,10 +304,12 @@ protected long insertDataForBatch(ISqlTransaction transaction, TableInfo tableIn new Object[] { tableInfo.getTableName(), eventType, sql, tableInfo.getTriggerHistory().getTriggerHistoryId(), tableInfo.getChannelId(), null, now }, INSERT_DATA_TYPES); + return dataId; + } + protected void insertDataEvent(ISqlTransaction transaction, TableInfo tableInfo, long batchId, long dataId) { transaction.prepareAndExecute(INSERT_DATA_EVENT_SQL, new Object[] { dataId, batchId, tableInfo.getRouterInfo().getRouter().getRouterId() }, new int[] { Types.NUMERIC, Types.NUMERIC, Types.VARCHAR }); - return dataId; } public void setSymmetricEngine(ISymmetricEngine engine) { @@ -292,23 +320,34 @@ class RouterInfo { private Router router; - private Set nodes; + private List nodeIds = new ArrayList(); private Map tableInfos = new HashMap(); private String tempTableName; + + private String nodeQuery; public RouterInfo(Router router, Set nodes) { - this.router = router; - this.nodes = nodes; + this.router = router; + for (Node node : nodes) { + this.nodeIds.add(node.getNodeId()); + } + String expression = router.getRouterExpression(); Pattern pattern = Pattern.compile(".*\\s*temptable=(\\S*)\\s*.*", Pattern.CASE_INSENSITIVE); if (expression != null) { + expression = expression.replaceAll("\n", " ").replaceAll("\r", " "); Matcher matcher = pattern.matcher(expression); if (matcher.matches()) { tempTableName = matcher.group(1); } + pattern = Pattern.compile(".*\\s*nodequery=\"(.*)\"", Pattern.CASE_INSENSITIVE); + matcher = pattern.matcher(expression); + if (matcher.matches()) { + nodeQuery = matcher.group(1); + } } if (StringUtils.isBlank(tempTableName)) { throw new NotImplementedException("Missing temptable={name} for router expression."); @@ -319,8 +358,8 @@ public Router getRouter() { return router; } - public Set getNodes() { - return nodes; + public List getNodeIds() { + return nodeIds; } public TableInfo getTableInfo(DataMetaData dataMetaData, TriggerRouter triggerRouter) { @@ -343,6 +382,10 @@ protected TableInfo newTableInfo(DataMetaData dataMetaData, TriggerRouter trigge public String getTempTableName() { return tempTableName; } + + public String getNodeQuery() { + return nodeQuery; + } } class TableInfo {