From 9d1fb993d65946707af267a6aa136981ed4c90f5 Mon Sep 17 00:00:00 2001 From: erilong Date: Mon, 15 Oct 2007 15:02:39 +0000 Subject: [PATCH] expose Data and DataEvent creation from DataService --- .../org/jumpmind/symmetric/model/Data.java | 20 ++++ .../jumpmind/symmetric/model/DataEvent.java | 43 +++++++++ .../symmetric/service/IDataService.java | 10 +- .../service/impl/BootstrapService.java | 67 ++----------- .../symmetric/service/impl/DataService.java | 95 ++++++++++++++----- .../jmx/SymmetricManagementService.java | 4 +- .../src/main/resources/symmetric-services.xml | 14 +-- 7 files changed, 154 insertions(+), 99 deletions(-) create mode 100644 symmetric/src/main/java/org/jumpmind/symmetric/model/DataEvent.java diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/model/Data.java b/symmetric/src/main/java/org/jumpmind/symmetric/model/Data.java index f75551c887..0ed66d6804 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/model/Data.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/model/Data.java @@ -49,6 +49,8 @@ public class Data { private DataEventType eventType; private String tableName; + + private String channelId; /** * This is populated by the trigger when the event happens. It will be useful for @@ -69,6 +71,16 @@ public Data(long dataId, String pkData, String rowData, this.audit = audit; } + public Data(String channelId, String tableName, DataEventType eventType, String rowData, + String pkData, TriggerHistory audit) { + this.channelId = channelId; + this.tableName = tableName; + this.eventType = eventType; + this.rowData = rowData; + this.pkData = pkData; + this.audit = audit; + } + public long getDataId() { return dataId; } @@ -127,4 +139,12 @@ public void setAudit(TriggerHistory audit) this.audit = audit; } + public String getChannelId() { + return channelId; + } + + public void setChannelId(String channelId) { + this.channelId = channelId; + } + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/model/DataEvent.java b/symmetric/src/main/java/org/jumpmind/symmetric/model/DataEvent.java new file mode 100644 index 0000000000..7dd32fef46 --- /dev/null +++ b/symmetric/src/main/java/org/jumpmind/symmetric/model/DataEvent.java @@ -0,0 +1,43 @@ +package org.jumpmind.symmetric.model; + +public class DataEvent { + + private long dataId; + + private String nodeId; + + private Long batchId; + + public DataEvent() { + } + + public DataEvent(long dataId, String nodeId) { + this.dataId = dataId; + this.nodeId = nodeId; + } + + public Long getBatchId() { + return batchId; + } + + public void setBatchId(Long batchId) { + this.batchId = batchId; + } + + public long getDataId() { + return dataId; + } + + public void setDataId(long dataId) { + this.dataId = dataId; + } + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + +} diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataService.java index 7125a9f263..f13a7849c9 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -1,12 +1,20 @@ package org.jumpmind.symmetric.service; +import org.jumpmind.symmetric.model.Data; +import org.jumpmind.symmetric.model.DataEvent; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.Trigger; public interface IDataService { - public void reloadNode(String nodeId); + public String reloadNode(String nodeId); public void createReloadEvent(final Node targetNode, final Trigger trigger); + public void createHeartbeatEvent(Node node); + + public long createData(final Data data); + + public void createDataEvent(DataEvent dataEvent); + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/BootstrapService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/BootstrapService.java index 6200841a15..6eca62e346 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/BootstrapService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/BootstrapService.java @@ -22,10 +22,6 @@ package org.jumpmind.symmetric.service.impl; import java.net.ConnectException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.Date; import java.util.List; @@ -45,13 +41,11 @@ import org.jumpmind.symmetric.service.IBootstrapService; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IDataLoaderService; +import org.jumpmind.symmetric.service.IDataService; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.symmetric.transport.ITransportManager; import org.jumpmind.symmetric.util.RandomTimeSlot; -import org.springframework.dao.DataAccessException; -import org.springframework.jdbc.core.ConnectionCallback; -import org.springframework.jdbc.support.JdbcUtils; import org.springframework.transaction.annotation.Transactional; public class BootstrapService extends AbstractService implements IBootstrapService { @@ -71,14 +65,12 @@ public class BootstrapService extends AbstractService implements IBootstrapServi private ITransportManager transportManager; private IDataLoaderService dataLoaderService; + + private IDataService dataService; private RandomTimeSlot randomSleepTimeSlot; private boolean autoConfigureDatabase = true; - - private String insertNodeIntoDataSql; - - private String insertIntoDataEventSql; private String triggerPrefix; @@ -227,50 +219,11 @@ public void heartbeat() { node.setSymmetricVersion(Version.VERSION); node.setSyncURL(runtimeConfiguration.getMyUrl()); nodeService.updateNode(node); - insertPushDataForNode(node); + dataService.createHeartbeatEvent(node); logger.info("Done updating my node information and heartbeat time."); } } - - /** - * Because we can't add a trigger on the _node table, we are artificially generating heartbeat events. - * @param node - */ - private void insertPushDataForNode(Node node) { - String whereClause = " t.node_id = '" + node.getNodeId() + "'"; - Trigger trig = configurationService.getTriggerFor(tablePrefix + "_node", runtimeConfiguration.getNodeGroupId()); - if (trig != null) { - final String data = (String) jdbcTemplate.queryForObject(dbDialect.createCsvDataSql(trig, whereClause), - String.class); - final String pk = (String) jdbcTemplate.queryForObject(dbDialect.createCsvPrimaryKeySql(trig, whereClause), - String.class); - final TriggerHistory hist = configurationService.getLatestHistoryRecordFor(trig.getTriggerId()); - int dataId = (Integer) jdbcTemplate.execute(new ConnectionCallback() { - public Object doInConnection(Connection c) throws SQLException, DataAccessException { - PreparedStatement pstmt = c.prepareStatement(insertNodeIntoDataSql, new int[] { 1 }); - pstmt.setString(1, data); - pstmt.setString(2, pk); - pstmt.setInt(3, hist.getTriggerHistoryId()); - pstmt.execute(); - ResultSet rs = pstmt.getGeneratedKeys(); - rs.next(); - int dataId = rs.getInt(1); - JdbcUtils.closeResultSet(rs); - JdbcUtils.closeStatement(pstmt); - return dataId; - } - }); - - List nodes = nodeService.findNodesToPushTo(); - for (Node node2 : nodes) { - jdbcTemplate.update(insertIntoDataEventSql, new Object[] { dataId, node2.getNodeId() }); - } - } else { - logger - .info("Not generating data and data events for node because a trigger had not been created for that table yet."); - } - } - + private void sleepBeforeRegistrationRetry() { try { long sleepTimeInMs = DateUtils.MILLIS_PER_SECOND * randomSleepTimeSlot.getRandomValueSeededByDomainId(); @@ -393,14 +346,10 @@ public void setTablePrefix(String tablePrefix) { this.tablePrefix = tablePrefix; } - public void setInsertNodeIntoDataSql(String insertNodeIntoDataSql) { - this.insertNodeIntoDataSql = insertNodeIntoDataSql; + public void setDataService(IDataService dataService) { + this.dataService = dataService; } - - public void setInsertIntoDataEventSql(String insertIntoDataEventSql) { - this.insertIntoDataEventSql = insertIntoDataEventSql; - } - + public void setTriggerPrefix(String triggerPrefix) { this.triggerPrefix = triggerPrefix; } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 8dfd1f4313..93738a5b62 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -27,8 +27,12 @@ import java.util.List; import java.util.ListIterator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.db.IDbDialect; +import org.jumpmind.symmetric.model.Data; +import org.jumpmind.symmetric.model.DataEvent; import org.jumpmind.symmetric.model.DataEventType; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.Trigger; @@ -38,18 +42,18 @@ import org.jumpmind.symmetric.service.INodeService; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.ConnectionCallback; -import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.support.JdbcUtils; -public class DataService implements IDataService { - - private JdbcTemplate jdbcTemplate; +public class DataService extends AbstractService implements IDataService { + static final Log logger = LogFactory.getLog(DataService.class); + private IConfigurationService configurationService; private INodeService nodeService; - @SuppressWarnings("unused") + private String tablePrefix; + private IDbDialect dbDialect; private String insertIntoDataSql; @@ -59,33 +63,36 @@ public class DataService implements IDataService { public void createReloadEvent(final Node targetNode, final Trigger trigger) { final TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId()); - int dataId = insertData(new Object[] { Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), - DataEventType.RELOAD.getCode(), null, null, history.getTriggerHistoryId() }); - jdbcTemplate.update(insertIntoDataEventSql, new Object[] { dataId, targetNode.getNodeId() }); + Data data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.RELOAD, + null, null, history); + long dataId = createData(data); + createDataEvent(new DataEvent(dataId, targetNode.getNodeId())); } public void createPurgeEvent(final Node targetNode, final Trigger trigger) { final TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId()); - //final String sql = dbDialect.createPurgeSqlFor(targetNode, trigger); - final String sql = "delete from " + trigger.getDefaultTargetTableName(); - - int dataId = insertData(new Object[] { Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), - DataEventType.SQL.getCode(), sql, null, history.getTriggerHistoryId() }); - jdbcTemplate.update(insertIntoDataEventSql, new Object[] { dataId, targetNode.getNodeId() }); + final String sql = dbDialect.createPurgeSqlFor(targetNode, trigger); + + Data data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.SQL, sql, + null, history); + long dataId = createData(data); + createDataEvent(new DataEvent(dataId, targetNode.getNodeId())); } - protected int insertData(final Object[] values) { - return (Integer) jdbcTemplate.execute(new ConnectionCallback() { + public long createData(final Data data) { + return (Long) jdbcTemplate.execute(new ConnectionCallback() { public Object doInConnection(Connection c) throws SQLException, DataAccessException { PreparedStatement ps = c.prepareStatement(insertIntoDataSql, new int[] { 1 }); - int columnNumber = 1; - for (Object value : values) { - ps.setObject(columnNumber++, value); - } + ps.setString(1, data.getChannelId()); + ps.setString(2, data.getTableName()); + ps.setString(3, data.getEventType().getCode()); + ps.setString(4, data.getRowData()); + ps.setString(5, data.getPkData()); + ps.setLong(6, data.getAudit().getTriggerHistoryId()); ps.execute(); ResultSet rs = ps.getGeneratedKeys(); rs.next(); - int dataId = rs.getInt(1); + long dataId = rs.getLong(1); JdbcUtils.closeResultSet(rs); JdbcUtils.closeStatement(ps); return dataId; @@ -93,9 +100,17 @@ public Object doInConnection(Connection c) throws SQLException, DataAccessExcept }); } - public void reloadNode(String nodeId) { + public void createDataEvent(DataEvent dataEvent) { + jdbcTemplate.update(insertIntoDataEventSql, new Object[] { dataEvent.getDataId(), + dataEvent.getNodeId() }); + } + + public String reloadNode(String nodeId) { Node sourceNode = nodeService.findIdentity(); Node targetNode = nodeService.findNode(nodeId); + if (targetNode == null) { + return "Unknown node " + nodeId; + } List triggers = configurationService.getActiveTriggersForReload(sourceNode.getNodeGroupId(), targetNode.getNodeGroupId()); for (ListIterator iterator = triggers.listIterator(triggers.size()); iterator.hasPrevious();) { @@ -105,6 +120,34 @@ public void reloadNode(String nodeId) { for (Trigger trigger : triggers) { createReloadEvent(targetNode, trigger); } + return "Successfully created events to reload node " + nodeId; + } + + /** + * Because we can't add a trigger on the _node table, we are artificially generating heartbeat events. + * @param node + */ + public void createHeartbeatEvent(Node node) { + String whereClause = " t.node_id = '" + node.getNodeId() + "'"; + Trigger trigger = configurationService.getTriggerFor(tablePrefix + "_node", runtimeConfiguration + .getNodeGroupId()); + if (trigger != null) { + String rowData = (String) jdbcTemplate.queryForObject(dbDialect.createCsvDataSql(trigger, + whereClause), String.class); + String pkData = (String) jdbcTemplate.queryForObject(dbDialect.createCsvPrimaryKeySql(trigger, + whereClause), String.class); + TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId()); + Data data = new Data(Constants.CHANNEL_CONFIG, trigger.getSourceTableName(), + DataEventType.UPDATE, rowData, pkData, history); + long dataId = createData(data); + + List nodes = nodeService.findNodesToPushTo(); + for (Node pushNode : nodes) { + createDataEvent(new DataEvent(dataId, pushNode.getNodeId())); + } + } else { + logger.info("Not generating data/data events for node because a trigger is not created for that table yet."); + } } public void setConfigurationService(IConfigurationService configurationService) { @@ -119,10 +162,6 @@ public void setInsertIntoDataSql(String insertIntoDataSql) { this.insertIntoDataSql = insertIntoDataSql; } - public void setJdbcTemplate(JdbcTemplate jdbcTemplate) { - this.jdbcTemplate = jdbcTemplate; - } - public void setNodeService(INodeService nodeService) { this.nodeService = nodeService; } @@ -131,4 +170,8 @@ public void setDbDialect(IDbDialect dbDialect) { this.dbDialect = dbDialect; } + public void setTablePrefix(String tablePrefix) { + this.tablePrefix = tablePrefix; + } + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/jmx/SymmetricManagementService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/jmx/SymmetricManagementService.java index 63201bde00..30a336de0e 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/jmx/SymmetricManagementService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/jmx/SymmetricManagementService.java @@ -95,8 +95,8 @@ public int getNumberOfActiveConnections() { @ManagedOperation(description = "Send an initial load of data to a node.") @ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id to reload.") }) - public void reloadNode(String nodeId) { - dataService.reloadNode(nodeId); + public String reloadNode(String nodeId) { + return dataService.reloadNode(nodeId); } public void setRuntimeConfiguration(IRuntimeConfig runtimeConfiguration) { diff --git a/symmetric/src/main/resources/symmetric-services.xml b/symmetric/src/main/resources/symmetric-services.xml index 42149bfbcd..efb7fd378a 100644 --- a/symmetric/src/main/resources/symmetric-services.xml +++ b/symmetric/src/main/resources/symmetric-services.xml @@ -23,18 +23,8 @@ + - - - insert into ${sync.table.prefix}_data - (data_id, channel_id,table_name,event_type,row_data,pk_data,trigger_hist_id,create_time) - values(null,'config','${sync.table.prefix}_node','U',?,?,?,current_timestamp) - - - - insert into ${sync.table.prefix}_data_event (data_id,node_id) values(?,?) - - + + insert into ${sync.table.prefix}_data