Skip to content

Commit

Permalink
0004097: use current transaction, single data_event, cache sorting
Browse files Browse the repository at this point in the history
tables, query for node list option, log stats
  • Loading branch information
erilong committed Oct 2, 2019
1 parent 5479cdc commit 1848bbd
Showing 1 changed file with 105 additions and 62 deletions.
Expand Up @@ -38,9 +38,9 @@
import org.jumpmind.db.model.Database;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.UniqueKeyException;
import org.jumpmind.db.sql.mapper.StringMapper;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.Constants;
Expand All @@ -64,6 +64,8 @@ public class ConvertToReloadRouter extends AbstractDataRouter implements IDataRo

private final static String ROUTERS = "c2rRouters";

private final static String SORTED_TABLES = "c2rSortedTables";

private final static String INSERT_DATA_SQL = "insert into sym_data " +
"(data_id, table_name, event_type, row_data, trigger_hist_id, channel_id, node_list, create_time) values (null, ?, ?, ?, ?, ?, ?, ?)";

Expand All @@ -75,7 +77,9 @@ public class ConvertToReloadRouter extends AbstractDataRouter implements IDataRo

protected ISymmetricEngine engine;

protected static boolean firstTime = true;
protected boolean firstTime = true;

protected long routeMs, sortMs, insertTempMs, queryNodesMs, insertBatchMs;

public ConvertToReloadRouter(ISymmetricEngine engine) {
this.engine = engine;
Expand All @@ -89,6 +93,7 @@ public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMe
return toNodeIds(nodes, null);
}

long ts = System.currentTimeMillis();
@SuppressWarnings("unchecked")
Map<String, RouterInfo> routers = (Map<String, RouterInfo>) context.get(ROUTERS);
if (routers == null) {
Expand All @@ -111,6 +116,7 @@ public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMe
return toNodeIds(nodes, null);
}

routeMs += (System.currentTimeMillis() - ts);
return null;
}

Expand All @@ -131,57 +137,31 @@ protected Object[] getPkObjects(DataEventType eventType, DataMetaData dataMetaDa
public void completeBatch(SimpleRouterContext context, OutgoingBatch batch) {
log.debug("Completing batch {}", batch.getBatchId());
if (batch.getNodeId().equals(Constants.UNROUTED_NODE_ID)) {
ISqlTemplate sqlTemplate = engine.getSqlTemplate();
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();

@SuppressWarnings("unchecked")
Map<String, RouterInfo> routers = (Map<String, RouterInfo>) context.get(ROUTERS);
List<TableInfo> tableInfos = new ArrayList<TableInfo>();
for (RouterInfo routerInfo : routers.values()) {
tableInfos.addAll(routerInfo.getTableInfos());
}
tableInfos = sortTableInfos(tableInfos);

queueEvents((ChannelRouterContext) context, transaction, batch, tableInfos);
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
if (transaction != null) {
transaction.close();
}
@SuppressWarnings("unchecked")
Map<String, RouterInfo> routers = (Map<String, RouterInfo>) context.get(ROUTERS);
List<TableInfo> tableInfos = new ArrayList<TableInfo>();
for (RouterInfo routerInfo : routers.values()) {
tableInfos.addAll(routerInfo.getTableInfos());
}
transaction.commit();
tableInfos = sortTableInfos(context, tableInfos);

ChannelRouterContext channelContext = ((ChannelRouterContext) context);
queueEvents(channelContext, channelContext.getSqlTransaction(), batch, tableInfos);
}
}

protected List<TableInfo> sortTableInfos(Collection<TableInfo> tableInfos) {
List<TriggerHistory> histories = null;
if (firstTime) {
histories = engine.getTriggerRouterService().getActiveTriggerHistories();
firstTime = false;
} else {
histories = engine.getTriggerRouterService().getActiveTriggerHistoriesFromCache();
}

List<Table> allTables = new ArrayList<Table>(histories.size());
for (TriggerHistory history : histories) {
Table table = engine.getDatabasePlatform().getTableFromCache(history.getSourceCatalogName(),
history.getSourceSchemaName(), history.getSourceTableName(), false);
if (table != null) {
allTables.add(table);
}
@Override
public void contextCommitted(SimpleRouterContext context) {
if (routeMs + sortMs + insertTempMs + queryNodesMs + insertBatchMs > 60000) {
log.info("{} router millis are route={}, sort={}, insert.temp={}, query.nodes={}, insert.batches={}",
ROUTER_ID, routeMs, sortMs, insertTempMs, queryNodesMs, insertBatchMs);
}
List<Table> sortedTables = Database.sortByForeignKeys(allTables);
routeMs = sortMs = insertTempMs = queryNodesMs = insertBatchMs = 0;
}

protected List<TableInfo> sortTableInfos(SimpleRouterContext context, Collection<TableInfo> tableInfos) {
long ts = System.currentTimeMillis();
List<Table> sortedTables = getAllSortedTables(context);

Map<Table, TableInfo> tableInfosByTable = new HashMap<Table, TableInfo>();
for (TableInfo tableInfo : tableInfos) {
Expand All @@ -196,21 +176,50 @@ protected List<TableInfo> sortTableInfos(Collection<TableInfo> tableInfos) {
}
}

sortMs += (System.currentTimeMillis() - ts);
return sortedTableInfos;
}

protected List<Table> getAllSortedTables(SimpleRouterContext context) {
@SuppressWarnings("unchecked")
List<Table> sortedTables = (List<Table>) context.get(SORTED_TABLES);

if (sortedTables == null) {
List<TriggerHistory> histories = null;
if (firstTime) {
histories = engine.getTriggerRouterService().getActiveTriggerHistories();
firstTime = false;
} else {
histories = engine.getTriggerRouterService().getActiveTriggerHistoriesFromCache();
}

List<Table> allTables = new ArrayList<Table>(histories.size());
for (TriggerHistory history : histories) {
Table table = engine.getDatabasePlatform().getTableFromCache(history.getSourceCatalogName(),
history.getSourceSchemaName(), history.getSourceTableName(), false);
if (table != null) {
allTables.add(table);
}
}
sortedTables = Database.sortByForeignKeys(allTables);
}
return sortedTables;
}

protected void queueEvents(ChannelRouterContext context, ISqlTransaction transaction, OutgoingBatch origBatch, List<TableInfo> tableInfos) {

final long loadId = engine.getSequenceService().nextVal(transaction, Constants.SEQUENCE_OUTGOING_BATCH_LOAD_ID);
final int typeForId = engine.getSymmetricDialect().getSqlTypeForIds();
boolean isPostgres = engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.POSTGRESQL);
long ts = System.currentTimeMillis();

for (TableInfo tableInfo : tableInfos) {
RouterInfo routerInfo = tableInfo.getRouterInfo();
String placeHolders = StringUtils.repeat("?", ", ", tableInfo.getPkColumnNames().length + 1);
String tempSql = "insert into " + routerInfo.getTempTableName() + "(" + tableInfo.getPkColumnNamesAsString() + ", "
+ " load_id) values (" + placeHolders + ")";

if (engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.POSTGRESQL)) {
if (isPostgres) {
tempSql += " on conflict do nothing";
}
transaction.prepare(tempSql);
Expand All @@ -225,21 +234,38 @@ protected void queueEvents(ChannelRouterContext context, ISqlTransaction transac
}
}
}
insertTempMs += (System.currentTimeMillis() - ts);

Map<String, OutgoingBatch> batchByNode = new HashMap<String, OutgoingBatch>();
for (TableInfo tableInfo : tableInfos) {
RouterInfo routerInfo = tableInfo.getRouterInfo();
String reloadSql = getTempTableSql(routerInfo, tableInfo, loadId);
for (Node node : routerInfo.getNodes()) {
OutgoingBatch batch = batchByNode.get(node.getNodeId());

List<String> nodes = null;
if (StringUtils.isNotBlank(routerInfo.getNodeQuery())) {
ts = System.currentTimeMillis();
transaction.flush();
nodes = transaction.query(routerInfo.getNodeQuery(), new StringMapper(), new Object[] { loadId }, new int[] { typeForId });
queryNodesMs += (System.currentTimeMillis() - ts);
}
if (nodes == null || nodes.size() == 0) {
nodes = routerInfo.getNodeIds();
}

ts = System.currentTimeMillis();
long dataId = insertData(transaction, tableInfo, DataEventType.RELOAD.getCode(), reloadSql);
String tableName = tableInfo.getTableName().toLowerCase();
for (String nodeId : nodes) {
OutgoingBatch batch = batchByNode.get(nodeId);
if (batch == null) {
batch = newBatch(transaction, node.getNodeId(), loadId, tableInfo, origBatch.getSummary());
batchByNode.put(node.getNodeId(), batch);
batch = newBatch(transaction, nodeId, loadId, tableInfo, origBatch.getSummary());
batchByNode.put(nodeId, batch);
}
batch.incrementTableCount(tableInfo.getTableName().toLowerCase());
long dataId = insertDataForBatch(transaction, tableInfo, batch.getBatchId(), DataEventType.RELOAD.getCode(), reloadSql);
batch.incrementTableCount(tableName);
insertDataEvent(transaction, tableInfo, batch.getBatchId(), dataId);
context.getDataIds().add(dataId);
}
insertBatchMs += (System.currentTimeMillis() - ts);
}
origBatch.setLoadId(loadId);
}
Expand Down Expand Up @@ -270,18 +296,20 @@ protected OutgoingBatch newBatch(ISqlTransaction transaction, String nodeId, lon
return batch;
}

protected long insertDataForBatch(ISqlTransaction transaction, TableInfo tableInfo, long batchId, String eventType, String sql) {
protected long insertData(ISqlTransaction transaction, TableInfo tableInfo, String eventType, String sql) {
Timestamp now = new Timestamp(System.currentTimeMillis());
long dataId = transaction.insertWithGeneratedKey(INSERT_DATA_SQL,
engine.getSymmetricDialect().getSequenceKeyName(SequenceIdentifier.DATA),
engine.getSymmetricDialect().getSequenceName(SequenceIdentifier.DATA),
new Object[] { tableInfo.getTableName(), eventType, sql, tableInfo.getTriggerHistory().getTriggerHistoryId(),
tableInfo.getChannelId(), null, now },
INSERT_DATA_TYPES);
return dataId;
}

protected void insertDataEvent(ISqlTransaction transaction, TableInfo tableInfo, long batchId, long dataId) {
transaction.prepareAndExecute(INSERT_DATA_EVENT_SQL, new Object[] { dataId, batchId, tableInfo.getRouterInfo().getRouter().getRouterId() },
new int[] { Types.NUMERIC, Types.NUMERIC, Types.VARCHAR });
return dataId;
}

public void setSymmetricEngine(ISymmetricEngine engine) {
Expand All @@ -292,23 +320,34 @@ class RouterInfo {

private Router router;

private Set<Node> nodes;
private List<String> nodeIds = new ArrayList<String>();

private Map<Integer, TableInfo> tableInfos = new HashMap<Integer, TableInfo>();

private String tempTableName;

private String nodeQuery;

public RouterInfo(Router router, Set<Node> nodes) {
this.router = router;
this.nodes = nodes;
this.router = router;
for (Node node : nodes) {
this.nodeIds.add(node.getNodeId());
}

String expression = router.getRouterExpression();

Pattern pattern = Pattern.compile(".*\\s*temptable=(\\S*)\\s*.*", Pattern.CASE_INSENSITIVE);
if (expression != null) {
expression = expression.replaceAll("\n", " ").replaceAll("\r", " ");
Matcher matcher = pattern.matcher(expression);
if (matcher.matches()) {
tempTableName = matcher.group(1);
}
pattern = Pattern.compile(".*\\s*nodequery=\"(.*)\"", Pattern.CASE_INSENSITIVE);
matcher = pattern.matcher(expression);
if (matcher.matches()) {
nodeQuery = matcher.group(1);
}
}
if (StringUtils.isBlank(tempTableName)) {
throw new NotImplementedException("Missing temptable={name} for router expression.");
Expand All @@ -319,8 +358,8 @@ public Router getRouter() {
return router;
}

public Set<Node> getNodes() {
return nodes;
public List<String> getNodeIds() {
return nodeIds;
}

public TableInfo getTableInfo(DataMetaData dataMetaData, TriggerRouter triggerRouter) {
Expand All @@ -343,6 +382,10 @@ protected TableInfo newTableInfo(DataMetaData dataMetaData, TriggerRouter trigge
public String getTempTableName() {
return tempTableName;
}

public String getNodeQuery() {
return nodeQuery;
}
}

class TableInfo {
Expand Down

0 comments on commit 1848bbd

Please sign in to comment.