Skip to content

Commit

Permalink
expose Data and DataEvent creation from DataService
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 15, 2007
1 parent 54d5211 commit 9d1fb99
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 99 deletions.
20 changes: 20 additions & 0 deletions symmetric/src/main/java/org/jumpmind/symmetric/model/Data.java
Expand Up @@ -49,6 +49,8 @@ public class Data {
private DataEventType eventType;

private String tableName;

private String channelId;

/**
* This is populated by the trigger when the event happens. It will be useful for
Expand All @@ -69,6 +71,16 @@ public Data(long dataId, String pkData, String rowData,
this.audit = audit;
}

public Data(String channelId, String tableName, DataEventType eventType, String rowData,
String pkData, TriggerHistory audit) {
this.channelId = channelId;
this.tableName = tableName;
this.eventType = eventType;
this.rowData = rowData;
this.pkData = pkData;
this.audit = audit;
}

public long getDataId() {
return dataId;
}
Expand Down Expand Up @@ -127,4 +139,12 @@ public void setAudit(TriggerHistory audit)
this.audit = audit;
}

public String getChannelId() {
return channelId;
}

public void setChannelId(String channelId) {
this.channelId = channelId;
}

}
@@ -0,0 +1,43 @@
package org.jumpmind.symmetric.model;

public class DataEvent {

private long dataId;

private String nodeId;

private Long batchId;

public DataEvent() {
}

public DataEvent(long dataId, String nodeId) {
this.dataId = dataId;
this.nodeId = nodeId;
}

public Long getBatchId() {
return batchId;
}

public void setBatchId(Long batchId) {
this.batchId = batchId;
}

public long getDataId() {
return dataId;
}

public void setDataId(long dataId) {
this.dataId = dataId;
}

public String getNodeId() {
return nodeId;
}

public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}

}
@@ -1,12 +1,20 @@
package org.jumpmind.symmetric.service;

import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataEvent;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.Trigger;

public interface IDataService {

public void reloadNode(String nodeId);
public String reloadNode(String nodeId);

public void createReloadEvent(final Node targetNode, final Trigger trigger);

public void createHeartbeatEvent(Node node);

public long createData(final Data data);

public void createDataEvent(DataEvent dataEvent);

}
Expand Up @@ -22,10 +22,6 @@
package org.jumpmind.symmetric.service.impl;

import java.net.ConnectException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;

Expand All @@ -45,13 +41,11 @@
import org.jumpmind.symmetric.service.IBootstrapService;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IDataLoaderService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.transport.ITransportManager;
import org.jumpmind.symmetric.util.RandomTimeSlot;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.support.JdbcUtils;
import org.springframework.transaction.annotation.Transactional;

public class BootstrapService extends AbstractService implements IBootstrapService {
Expand All @@ -71,14 +65,12 @@ public class BootstrapService extends AbstractService implements IBootstrapServi
private ITransportManager transportManager;

private IDataLoaderService dataLoaderService;

private IDataService dataService;

private RandomTimeSlot randomSleepTimeSlot;

private boolean autoConfigureDatabase = true;

private String insertNodeIntoDataSql;

private String insertIntoDataEventSql;

private String triggerPrefix;

Expand Down Expand Up @@ -227,50 +219,11 @@ public void heartbeat() {
node.setSymmetricVersion(Version.VERSION);
node.setSyncURL(runtimeConfiguration.getMyUrl());
nodeService.updateNode(node);
insertPushDataForNode(node);
dataService.createHeartbeatEvent(node);
logger.info("Done updating my node information and heartbeat time.");
}
}

/**
* Because we can't add a trigger on the _node table, we are artificially generating heartbeat events.
* @param node
*/
private void insertPushDataForNode(Node node) {
String whereClause = " t.node_id = '" + node.getNodeId() + "'";
Trigger trig = configurationService.getTriggerFor(tablePrefix + "_node", runtimeConfiguration.getNodeGroupId());
if (trig != null) {
final String data = (String) jdbcTemplate.queryForObject(dbDialect.createCsvDataSql(trig, whereClause),
String.class);
final String pk = (String) jdbcTemplate.queryForObject(dbDialect.createCsvPrimaryKeySql(trig, whereClause),
String.class);
final TriggerHistory hist = configurationService.getLatestHistoryRecordFor(trig.getTriggerId());
int dataId = (Integer) jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection c) throws SQLException, DataAccessException {
PreparedStatement pstmt = c.prepareStatement(insertNodeIntoDataSql, new int[] { 1 });
pstmt.setString(1, data);
pstmt.setString(2, pk);
pstmt.setInt(3, hist.getTriggerHistoryId());
pstmt.execute();
ResultSet rs = pstmt.getGeneratedKeys();
rs.next();
int dataId = rs.getInt(1);
JdbcUtils.closeResultSet(rs);
JdbcUtils.closeStatement(pstmt);
return dataId;
}
});

List<Node> nodes = nodeService.findNodesToPushTo();
for (Node node2 : nodes) {
jdbcTemplate.update(insertIntoDataEventSql, new Object[] { dataId, node2.getNodeId() });
}
} else {
logger
.info("Not generating data and data events for node because a trigger had not been created for that table yet.");
}
}


private void sleepBeforeRegistrationRetry() {
try {
long sleepTimeInMs = DateUtils.MILLIS_PER_SECOND * randomSleepTimeSlot.getRandomValueSeededByDomainId();
Expand Down Expand Up @@ -393,14 +346,10 @@ public void setTablePrefix(String tablePrefix) {
this.tablePrefix = tablePrefix;
}

public void setInsertNodeIntoDataSql(String insertNodeIntoDataSql) {
this.insertNodeIntoDataSql = insertNodeIntoDataSql;
public void setDataService(IDataService dataService) {
this.dataService = dataService;
}

public void setInsertIntoDataEventSql(String insertIntoDataEventSql) {
this.insertIntoDataEventSql = insertIntoDataEventSql;
}


public void setTriggerPrefix(String triggerPrefix) {
this.triggerPrefix = triggerPrefix;
}
Expand Down
Expand Up @@ -27,8 +27,12 @@
import java.util.List;
import java.util.ListIterator;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataEvent;
import org.jumpmind.symmetric.model.DataEventType;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.Trigger;
Expand All @@ -38,18 +42,18 @@
import org.jumpmind.symmetric.service.INodeService;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.support.JdbcUtils;

public class DataService implements IDataService {

private JdbcTemplate jdbcTemplate;
public class DataService extends AbstractService implements IDataService {

static final Log logger = LogFactory.getLog(DataService.class);

private IConfigurationService configurationService;

private INodeService nodeService;

@SuppressWarnings("unused")
private String tablePrefix;

private IDbDialect dbDialect;

private String insertIntoDataSql;
Expand All @@ -59,43 +63,54 @@ public class DataService implements IDataService {
public void createReloadEvent(final Node targetNode, final Trigger trigger) {
final TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());

int dataId = insertData(new Object[] { Constants.CHANNEL_RELOAD, trigger.getSourceTableName(),
DataEventType.RELOAD.getCode(), null, null, history.getTriggerHistoryId() });
jdbcTemplate.update(insertIntoDataEventSql, new Object[] { dataId, targetNode.getNodeId() });
Data data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.RELOAD,
null, null, history);
long dataId = createData(data);
createDataEvent(new DataEvent(dataId, targetNode.getNodeId()));
}

public void createPurgeEvent(final Node targetNode, final Trigger trigger) {
final TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
//final String sql = dbDialect.createPurgeSqlFor(targetNode, trigger);
final String sql = "delete from " + trigger.getDefaultTargetTableName();

int dataId = insertData(new Object[] { Constants.CHANNEL_RELOAD, trigger.getSourceTableName(),
DataEventType.SQL.getCode(), sql, null, history.getTriggerHistoryId() });
jdbcTemplate.update(insertIntoDataEventSql, new Object[] { dataId, targetNode.getNodeId() });
final String sql = dbDialect.createPurgeSqlFor(targetNode, trigger);

Data data = new Data(Constants.CHANNEL_RELOAD, trigger.getSourceTableName(), DataEventType.SQL, sql,
null, history);
long dataId = createData(data);
createDataEvent(new DataEvent(dataId, targetNode.getNodeId()));
}

protected int insertData(final Object[] values) {
return (Integer) jdbcTemplate.execute(new ConnectionCallback() {
public long createData(final Data data) {
return (Long) jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection c) throws SQLException, DataAccessException {
PreparedStatement ps = c.prepareStatement(insertIntoDataSql, new int[] { 1 });
int columnNumber = 1;
for (Object value : values) {
ps.setObject(columnNumber++, value);
}
ps.setString(1, data.getChannelId());
ps.setString(2, data.getTableName());
ps.setString(3, data.getEventType().getCode());
ps.setString(4, data.getRowData());
ps.setString(5, data.getPkData());
ps.setLong(6, data.getAudit().getTriggerHistoryId());
ps.execute();
ResultSet rs = ps.getGeneratedKeys();
rs.next();
int dataId = rs.getInt(1);
long dataId = rs.getLong(1);
JdbcUtils.closeResultSet(rs);
JdbcUtils.closeStatement(ps);
return dataId;
}
});
}

public void reloadNode(String nodeId) {
public void createDataEvent(DataEvent dataEvent) {
jdbcTemplate.update(insertIntoDataEventSql, new Object[] { dataEvent.getDataId(),
dataEvent.getNodeId() });
}

public String reloadNode(String nodeId) {
Node sourceNode = nodeService.findIdentity();
Node targetNode = nodeService.findNode(nodeId);
if (targetNode == null) {
return "Unknown node " + nodeId;
}
List<Trigger> triggers = configurationService.getActiveTriggersForReload(sourceNode.getNodeGroupId(),
targetNode.getNodeGroupId());
for (ListIterator<Trigger> iterator = triggers.listIterator(triggers.size()); iterator.hasPrevious();) {
Expand All @@ -105,6 +120,34 @@ public void reloadNode(String nodeId) {
for (Trigger trigger : triggers) {
createReloadEvent(targetNode, trigger);
}
return "Successfully created events to reload node " + nodeId;
}

/**
* Because we can't add a trigger on the _node table, we are artificially generating heartbeat events.
* @param node
*/
public void createHeartbeatEvent(Node node) {
String whereClause = " t.node_id = '" + node.getNodeId() + "'";
Trigger trigger = configurationService.getTriggerFor(tablePrefix + "_node", runtimeConfiguration
.getNodeGroupId());
if (trigger != null) {
String rowData = (String) jdbcTemplate.queryForObject(dbDialect.createCsvDataSql(trigger,
whereClause), String.class);
String pkData = (String) jdbcTemplate.queryForObject(dbDialect.createCsvPrimaryKeySql(trigger,
whereClause), String.class);
TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
Data data = new Data(Constants.CHANNEL_CONFIG, trigger.getSourceTableName(),
DataEventType.UPDATE, rowData, pkData, history);
long dataId = createData(data);

List<Node> nodes = nodeService.findNodesToPushTo();
for (Node pushNode : nodes) {
createDataEvent(new DataEvent(dataId, pushNode.getNodeId()));
}
} else {
logger.info("Not generating data/data events for node because a trigger is not created for that table yet.");
}
}

public void setConfigurationService(IConfigurationService configurationService) {
Expand All @@ -119,10 +162,6 @@ public void setInsertIntoDataSql(String insertIntoDataSql) {
this.insertIntoDataSql = insertIntoDataSql;
}

public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}

public void setNodeService(INodeService nodeService) {
this.nodeService = nodeService;
}
Expand All @@ -131,4 +170,8 @@ public void setDbDialect(IDbDialect dbDialect) {
this.dbDialect = dbDialect;
}

public void setTablePrefix(String tablePrefix) {
this.tablePrefix = tablePrefix;
}

}
Expand Up @@ -95,8 +95,8 @@ public int getNumberOfActiveConnections() {

@ManagedOperation(description = "Send an initial load of data to a node.")
@ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id to reload.") })
public void reloadNode(String nodeId) {
dataService.reloadNode(nodeId);
public String reloadNode(String nodeId) {
return dataService.reloadNode(nodeId);
}

public void setRuntimeConfiguration(IRuntimeConfig runtimeConfiguration) {
Expand Down

0 comments on commit 9d1fb99

Please sign in to comment.