Skip to content

Commit

Permalink
Move transaction_id and channel_id to data_event for performance reas…
Browse files Browse the repository at this point in the history
…ons.
  • Loading branch information
chenson42 committed Feb 6, 2008
1 parent d4575c1 commit c53b656
Show file tree
Hide file tree
Showing 25 changed files with 195 additions and 142 deletions.
3 changes: 3 additions & 0 deletions symmetric/pom.xml
Expand Up @@ -169,6 +169,9 @@
</property>
</systemProperties>
<groups>continuous,integration,mysql</groups>
<additionalClasspathElements>
<value>${user.home}\.symmetricds\lib\ojdbc14.jar</value>
</additionalClasspathElements>
</configuration>
<executions>
<execution>
Expand Down
Expand Up @@ -146,10 +146,9 @@ protected String formatAsCsv(Object[] data) {
}

protected Data createData(Object[] oldRow, Object[] newRow) {
Data data = new Data(trigger.getChannelId(), StringUtils.isBlank(trigger.getTargetTableName()) ? tableName
Data data = new Data(StringUtils.isBlank(trigger.getTargetTableName()) ? tableName
: trigger.getTargetTableName(), triggerType, formatRowData(oldRow, newRow), formatPkRowData(oldRow,
newRow), triggerHistory);
data.setTransactionId(getTransactionId(oldRow, newRow));
newRow), triggerHistory);
return data;
}

Expand Down
Expand Up @@ -56,20 +56,18 @@ public static void insertData(String schemaName, String prefixName, String table
+ schemaName
+ prefixName
+ "_data "
+ "(table_name, channel_id, event_type, trigger_hist_id, transaction_id, pk_data, row_data, create_time) "
+ "values (?, ?, ?, ?, ?, ?, ?, current_timestamp)";
+ "(table_name, event_type, trigger_hist_id, pk_data, row_data, create_time) "
+ "values (?, ?, ?, ?, ?, current_timestamp)";
PreparedStatement ps = conn.prepareStatement(sql);
ps.setString(1, tableName);
ps.setString(2, channelName);
ps.setString(3, dmlType);
ps.setLong(4, triggerHistId);
ps.setString(5, transactionId);
ps.setString(6, pkData);
ps.setString(7, rowData);
ps.setString(2, dmlType);
ps.setLong(3, triggerHistId);
ps.setString(4, pkData);
ps.setString(5, rowData);
ps.executeUpdate();
ps.close();
sql = "insert into " + schemaName + prefixName + "_data_event (node_id, data_id) "
+ "select node_id, IDENTITY_VAL_LOCAL() from " + prefixName
sql = "insert into " + schemaName + prefixName + "_data_event (node_id, data_id, channel_id, transaction_id) "
+ "select node_id, IDENTITY_VAL_LOCAL(),'"+channelName+"','"+transactionId+"' from " + prefixName
+ "_node c where (c.node_group_id = ? and c.sync_enabled = 1) " + nodeSelectWhere;
ps = conn.prepareStatement(sql);
ps.setString(1, targetGroupId);
Expand Down
Expand Up @@ -61,7 +61,7 @@ public void fire(int type, String triggerName, String tableName, Object[] oldRow
Data data = createData(oldRow, newRow);
List<Node> nodes = findTargetNodes(oldRow, newRow);
if (nodes != null) {
dataService.insertDataEvent(data, nodes);
dataService.insertDataEvent(data, trigger.getChannelId(), getTransactionId(oldRow, newRow), nodes);
}
}
} catch (RuntimeException ex) {
Expand Down
23 changes: 1 addition & 22 deletions symmetric/src/main/java/org/jumpmind/symmetric/model/Data.java
Expand Up @@ -50,10 +50,6 @@ public class Data {

private String tableName;

private String channelId;

private String transactionId;

/**
* This is populated by the trigger when the event happens. It will be useful for
* research.
Expand All @@ -73,9 +69,8 @@ public Data(long dataId, String pkData, String rowData,
this.audit = audit;
}

public Data(String channelId, String tableName, DataEventType eventType, String rowData,
public Data(String tableName, DataEventType eventType, String rowData,
String pkData, TriggerHistory audit) {
this.channelId = channelId;
this.tableName = tableName;
this.eventType = eventType;
this.rowData = rowData;
Expand Down Expand Up @@ -141,20 +136,4 @@ public void setAudit(TriggerHistory audit)
this.audit = audit;
}

public String getChannelId() {
return channelId;
}

public void setChannelId(String channelId) {
this.channelId = channelId;
}

public String getTransactionId() {
return transactionId;
}

public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}

}
Expand Up @@ -9,10 +9,27 @@ public class DataEvent {
private Long batchId;

private boolean batched;

private String channelId;

private String transactionId;

public DataEvent() {
}

public DataEvent(long dataId, String nodeId, String channelId) {
this.dataId = dataId;
this.nodeId = nodeId;
this.channelId = channelId;
}

public DataEvent(long dataId, String nodeId, String channelId, String transactionId) {
this.dataId = dataId;
this.nodeId = nodeId;
this.channelId = channelId;
this.transactionId = transactionId;
}

public DataEvent(long dataId, String nodeId) {
this.dataId = dataId;
this.nodeId = nodeId;
Expand Down Expand Up @@ -50,4 +67,20 @@ public void setBatched(boolean batched) {
this.batched = batched;
}

public String getChannelId() {
return channelId;
}

public void setChannelId(String channelId) {
this.channelId = channelId;
}

public String getTransactionId() {
return transactionId;
}

public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}

}
Expand Up @@ -15,8 +15,10 @@ public interface IDataService {
@Transactional
public String reloadNode(String nodeId);

@Transactional
public void insertReloadEvent(Node targetNode);

@Transactional
public void insertReloadEvent(final Node targetNode, final Trigger trigger);

public void insertHeartbeatEvent(Node node);
Expand All @@ -25,9 +27,11 @@ public interface IDataService {

public void insertDataEvent(DataEvent dataEvent);

public void insertDataEvent(Data data, List<Node> nodes);
public void insertDataEvent(Data data, String channelId, List<Node> nodes);

public void insertDataEvent(Data data, String channelId, String transactionId, List<Node> nodes);

public void insertDataEvent(Data data, String nodeId);
public void insertDataEvent(Data data, String channelId, String nodeId);

public void insertPurgeEvent(Node targetNode, Trigger trigger);

Expand Down
Expand Up @@ -20,7 +20,12 @@

package org.jumpmind.symmetric.service;

import org.springframework.transaction.annotation.Transactional;

public interface IPurgeService
{
public void purge();

@Transactional
public void purgeAllIncomingEventForNode(String nodeId);
}
Expand Up @@ -47,6 +47,7 @@
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IPurgeService;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.PreparedStatementCallback;

Expand All @@ -59,6 +60,8 @@ public class DataService extends AbstractService implements IDataService {
private IConfigurationService configurationService;

private INodeService nodeService;

private IPurgeService purgeService;

private String tablePrefix;

Expand All @@ -77,9 +80,9 @@ public class DataService extends AbstractService implements IDataService {
public void insertReloadEvent(final Node targetNode, final Trigger trigger) {
final TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());

Data data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.RELOAD, null, null,
Data data = new Data(trigger.getSourceTableName(), DataEventType.RELOAD, null, null,
history);
insertDataEvent(data, targetNode.getNodeId());
insertDataEvent(data, Constants.CHANNEL_RELOAD, targetNode.getNodeId());
}

public void insertPurgeEvent(final Node targetNode, final Trigger trigger) {
Expand All @@ -89,59 +92,61 @@ public void insertPurgeEvent(final Node targetNode, final Trigger trigger) {

public void insertSqlEvent(final Node targetNode, final Trigger trigger, String sql) {
TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
Data data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.SQL,
Data data = new Data(trigger.getSourceTableName(), DataEventType.SQL,
CsvUtil.escapeCsvData(sql), null, history);
insertDataEvent(data, targetNode.getNodeId());
insertDataEvent(data, Constants.CHANNEL_RELOAD, targetNode.getNodeId());
}

public void insertCreateEvent(final Node targetNode, final Trigger trigger, String xml) {
TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
Data data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.CREATE,
Data data = new Data(trigger.getSourceTableName(), DataEventType.CREATE,
CsvUtil.escapeCsvData(xml), null, history);
insertDataEvent(data, targetNode.getNodeId());
insertDataEvent(data, Constants.CHANNEL_RELOAD, targetNode.getNodeId());
}

public long insertData(final Data data) {
return dbDialect.insertWithGeneratedKey(insertIntoDataSql, "sym_data_data_id_seq",
new PreparedStatementCallback() {
public Object doInPreparedStatement(PreparedStatement ps) throws SQLException,
DataAccessException {
ps.setString(1, data.getChannelId());
ps.setString(2, data.getTableName());
ps.setString(3, data.getEventType().getCode());
ps.setString(4, data.getRowData());
ps.setString(5, data.getPkData());
ps.setLong(6, data.getAudit().getTriggerHistoryId());
ps.setString(7, data.getTransactionId());
ps.setString(1, data.getTableName());
ps.setString(2, data.getEventType().getCode());
ps.setString(3, data.getRowData());
ps.setString(4, data.getPkData());
ps.setLong(5, data.getAudit().getTriggerHistoryId());
return null;
}
});
}

public void insertDataEvent(DataEvent dataEvent) {
jdbcTemplate.update(insertIntoDataEventSql, new Object[] { dataEvent.getDataId(),
dataEvent.getNodeId(), dataEvent.getBatchId(), dataEvent.isBatched() ? 1 : 0 }, new int[] {
Types.INTEGER, Types.VARCHAR, Types.INTEGER, Types.INTEGER });
dataEvent.getNodeId(), dataEvent.getChannelId(), dataEvent.getTransactionId(), dataEvent.getBatchId(), dataEvent.isBatched() ? 1 : 0 }, new int[] {
Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.INTEGER });
}

public void insertDataEvent(Data data, List<Node> nodes) {
public void insertDataEvent(Data data, String channelId, List<Node> nodes) {
insertDataEvent(data, channelId, null, nodes);
}

public void insertDataEvent(Data data, String channelId, String transactionId, List<Node> nodes) {
long dataId = insertData(data);
for (Node node : nodes) {
insertDataEvent(new DataEvent(dataId, node.getNodeId()));
insertDataEvent(new DataEvent(dataId, node.getNodeId(), channelId, transactionId));
}
}
}

public void insertDataEvent(Data data, String nodeId) {
public void insertDataEvent(Data data, String channelId, String nodeId) {
long dataId = insertData(data);
insertDataEvent(new DataEvent(dataId, nodeId));
insertDataEvent(new DataEvent(dataId, nodeId, channelId));
}

public String reloadNode(String nodeId) {
Node targetNode = nodeService.findNode(nodeId);
if (targetNode == null) {
return "Unknown node " + nodeId;
}
nodeService.setInitialLoadEnabled(nodeId, true);
nodeService.setInitialLoadEnabled(nodeId, true);
return "Successfully opened initial load for node " + nodeId;
}

Expand Down Expand Up @@ -181,13 +186,15 @@ public void insertReloadEvent(Node targetNode) {
}
nodeService.setInitialLoadEnabled(targetNode.getNodeId(), false);
insertNodeSecurityUpdate(targetNode);

// remove all incoming events from the node are starting a reload for.
purgeService.purgeAllIncomingEventForNode(targetNode.getNodeId());
}

private void insertNodeSecurityUpdate(Node node) {
Data data = createData(tablePrefix + "_node_security", " t.node_id = '" + node.getNodeId() + "'");
if (data != null) {
data.setChannelId(Constants.CHANNEL_RELOAD);
insertDataEvent(data, node.getNodeId());
insertDataEvent(data, Constants.CHANNEL_RELOAD, node.getNodeId());
}
}

Expand All @@ -198,8 +205,7 @@ private void insertNodeSecurityUpdate(Node node) {
public void insertHeartbeatEvent(Node node) {
Data data = createData(tablePrefix + "_node", " t.node_id = '" + node.getNodeId() + "'");
if (data != null && data.getAudit() != null) {
data.setChannelId(Constants.CHANNEL_CONFIG);
insertDataEvent(data, nodeService.findNodesToPushTo());
insertDataEvent(data, Constants.CHANNEL_CONFIG, nodeService.findNodesToPushTo());
} else {
logger
.info("Not generating data/data events for node because a trigger is not created for that table yet.");
Expand All @@ -223,7 +229,7 @@ public Data createData(String tableName, String whereClause) {
String.class);
}
TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.UPDATE, rowData,
data = new Data(trigger.getSourceTableName(), DataEventType.UPDATE, rowData,
pkData, history);
}
return data;
Expand Down Expand Up @@ -306,4 +312,8 @@ public void setCreateFirstForReload(boolean createFirstForReload) {
this.createFirstForReload = createFirstForReload;
}

public void setPurgeService(IPurgeService purgeService) {
this.purgeService = purgeService;
}

}
Expand Up @@ -82,7 +82,6 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa
* associated history row.
*/
public void buildOutgoingBatches(final String nodeId) {
// TODO should channels be cached?
final List<NodeChannel> channels = configurationService.getChannelsFor(true);

jdbcTemplate.execute(new ConnectionCallback() {
Expand Down
Expand Up @@ -53,6 +53,8 @@ public class PurgeService extends AbstractService implements IPurgeService {

private String[] otherPurgeSql;

private String[] deleteIncomingBatchesByNodeIdSql;

private int retentionInMinutes = 7200;

private String selectOutgoingBatchIdsToPurgeSql;
Expand Down Expand Up @@ -100,6 +102,14 @@ public void purge() {
}
}

public void purgeAllIncomingEventForNode(String nodeId) {
if (deleteIncomingBatchesByNodeIdSql != null)
for (String sql : deleteIncomingBatchesByNodeIdSql) {
int count = jdbcTemplate.update(sql, new Object[] { nodeId });
logger.info("Purged " + count + " rows for node " + nodeId + " after running: " + cleanSql(sql));
}
}

private void purgeDataRows() {
int dataIdCount = 0;
int totalCount = 0;
Expand Down Expand Up @@ -256,4 +266,8 @@ public void setMaxNumOfDataIdsToPurgeInTx(int maxNumOfDataIdsToPurgeInTx) {
this.maxNumOfDataIdsToPurgeInTx = maxNumOfDataIdsToPurgeInTx;
}

public void setDeleteIncomingBatchesByNodeIdSql(String[] deleteIncomingBatchesByNodeIdSql) {
this.deleteIncomingBatchesByNodeIdSql = deleteIncomingBatchesByNodeIdSql;
}

}

0 comments on commit c53b656

Please sign in to comment.