Skip to content

Commit

Permalink
Added an api to check to see if a node is done w/ it's intiial load.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Oct 15, 2007
1 parent 1daa4c2 commit 5f3ff19
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 31 deletions.
Expand Up @@ -31,4 +31,5 @@ public interface IOutgoingBatchService {
public List<OutgoingBatch> getOutgoingBatches(String clientId);
public void markOutgoingBatchSent(OutgoingBatch batch);
public void setBatchStatus(String batchId, Status status);
public boolean isInitialLoadComplete(String nodeId);
}
Expand Up @@ -40,8 +40,7 @@
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.support.JdbcUtils;

public class OutgoingBatchService extends AbstractService implements
IOutgoingBatchService {
public class OutgoingBatchService extends AbstractService implements IOutgoingBatchService {

final static Log logger = LogFactory.getLog(OutgoingBatchService.class);

Expand All @@ -57,6 +56,8 @@ public class OutgoingBatchService extends AbstractService implements

private String changeBatchStatusSql;

private String initialLoadStatusSql;

private IOutgoingBatchHistoryService historyService;

/**
Expand All @@ -70,27 +71,22 @@ public class OutgoingBatchService extends AbstractService implements
*/
public void buildOutgoingBatches(final String nodeId) {
// TODO should channels be cached?
final List<NodeChannel> channels = configurationService.getChannelsFor(
nodeId, true);
final List<NodeChannel> channels = configurationService.getChannelsFor(nodeId, true);

jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection conn) throws SQLException,
DataAccessException {
public Object doInConnection(Connection conn) throws SQLException, DataAccessException {

PreparedStatement update = conn
.prepareStatement(updateBatchedEventsSql);
PreparedStatement update = conn.prepareStatement(updateBatchedEventsSql);

update.setQueryTimeout(jdbcTemplate.getQueryTimeout());

for (NodeChannel channel : channels) {

if (channel.isSuspended()) {
logger.warn(channel.getId() + " channel for " + nodeId
+ " is currently suspended.");
logger.warn(channel.getId() + " channel for " + nodeId + " is currently suspended.");
} else if (channel.isEnabled()) {
// determine which transactions will be part of this batch on this channel
PreparedStatement select = conn
.prepareStatement(selectEventsToBatchSql);
PreparedStatement select = conn.prepareStatement(selectEventsToBatchSql);

select.setQueryTimeout(jdbcTemplate.getQueryTimeout());

Expand Down Expand Up @@ -119,9 +115,7 @@ public Object doInConnection(Connection conn) throws SQLException,
do {
String trxId = results.getString(1);

if (stopOnNextTxIdChange
&& (lastTrxId == null || !lastTrxId
.equals(trxId))) {
if (stopOnNextTxIdChange && (lastTrxId == null || !lastTrxId.equals(trxId))) {
break;
}

Expand All @@ -147,8 +141,7 @@ public Object doInConnection(Connection conn) throws SQLException,

lastTrxId = trxId;
} while (results.next());
historyService.created(new Integer(newBatch
.getBatchId()), count);
historyService.created(new Integer(newBatch.getBatchId()), count);
}

JdbcUtils.closeResultSet(results);
Expand All @@ -165,19 +158,16 @@ public Object doInConnection(Connection conn) throws SQLException,

public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) {
jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection conn) throws SQLException,
DataAccessException {
public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
insertOutgoingBatch(conn, outgoingBatch);
return null;
}
});
}

private void insertOutgoingBatch(Connection conn,
OutgoingBatch outgoingBatch) throws SQLException {
private void insertOutgoingBatch(Connection conn, OutgoingBatch outgoingBatch) throws SQLException {
// TODO: move generated key retrieval to DbDialect
PreparedStatement insert = conn.prepareStatement(createBatchSql,
new int[] { 1 });
PreparedStatement insert = conn.prepareStatement(createBatchSql, new int[] { 1 });
insert.setQueryTimeout(jdbcTemplate.getQueryTimeout());
insert.setString(1, outgoingBatch.getNodeId());
insert.setString(2, outgoingBatch.getChannelId());
Expand All @@ -196,10 +186,9 @@ private void insertOutgoingBatch(Connection conn,

@SuppressWarnings("unchecked")
public List<OutgoingBatch> getOutgoingBatches(String clientId) {
return (List<OutgoingBatch>) jdbcTemplate.query(selectOutgoingBatchSql,
new Object[] { clientId }, new RowMapper() {
public Object mapRow(ResultSet rs, int index)
throws SQLException {
return (List<OutgoingBatch>) jdbcTemplate.query(selectOutgoingBatchSql, new Object[] { clientId },
new RowMapper() {
public Object mapRow(ResultSet rs, int index) throws SQLException {
OutgoingBatch batch = new OutgoingBatch();
batch.setBatchId(rs.getString(1));
batch.setNodeId(rs.getString(2));
Expand All @@ -216,8 +205,7 @@ public void markOutgoingBatchSent(OutgoingBatch batch) {
}

public void setBatchStatus(String batchId, Status status) {
jdbcTemplate.update(changeBatchStatusSql, new Object[] { status.name(),
batchId });
jdbcTemplate.update(changeBatchStatusSql, new Object[] { status.name(), batchId });

if (status == Status.SE) {
historyService.sent(new Integer(batchId));
Expand All @@ -229,8 +217,21 @@ public void setBatchStatus(String batchId, Status status) {

}

public void setConfigurationService(
IConfigurationService configurationService) {
public boolean isInitialLoadComplete(String nodeId) {
String status = (String) jdbcTemplate.queryForObject(initialLoadStatusSql, new Object[] { nodeId },
String.class);
if (status == null) {
throw new RuntimeException("The initial load has not been started for " + nodeId);
} else if (Status.ER.equals(status)) {
throw new RuntimeException("The initial load errored out for " + nodeId);
} else if (Status.OK.equals(status)) {
return true;
} else {
return false;
}
}

public void setConfigurationService(IConfigurationService configurationService) {
this.configurationService = configurationService;
}

Expand Down Expand Up @@ -258,4 +259,8 @@ public void setHistoryService(IOutgoingBatchHistoryService historyService) {
this.historyService = historyService;
}

public void setInitialLoadStatusSql(String initialLoadStatusSql) {
this.initialLoadStatusSql = initialLoadStatusSql;
}

}
Expand Up @@ -33,6 +33,7 @@
import org.jumpmind.symmetric.service.IBootstrapService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.IPurgeService;
import org.jumpmind.symmetric.service.IRegistrationService;
import org.springframework.jmx.export.annotation.ManagedAttribute;
Expand All @@ -53,6 +54,8 @@ public class SymmetricManagementService {
private INodeService nodeService;

private IDataService dataService;

private IOutgoingBatchService outgoingBatchService;

private IRegistrationService registrationService;

Expand Down Expand Up @@ -107,6 +110,12 @@ public boolean isExternalIdRegistered(String externalId) {
return nodeService.isExternalIdRegistered(externalId);
}

@ManagedOperation(description = "Check to see if the initial load for a node id is complete. This method will throw an exception if the load error'd out or was never started.")
@ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id") })
public boolean isInitialLoadComplete(String nodeId) {
return outgoingBatchService.isInitialLoadComplete(nodeId);
}

@ManagedOperation(description = "Enable or disable a channel for a specific external id")
@ManagedOperationParameters( {
@ManagedOperationParameter(name = "ignore", description = "Set to true to enable and false to disable"),
Expand Down Expand Up @@ -166,4 +175,8 @@ public void setNodeService(INodeService nodeService) {
public void setRegistrationService(IRegistrationService registrationService) {
this.registrationService = registrationService;
}

public void setOutgoingBatchService(IOutgoingBatchService outgoingBatchService) {
this.outgoingBatchService = outgoingBatchService;
}
}
1 change: 1 addition & 0 deletions symmetric/src/main/resources/symmetric-jmx.xml
Expand Up @@ -39,6 +39,7 @@
<property name="registrationService" ref="registrationService" />
<property name="properties" ref="properties" />
<property name="dataSource" ref="dataSource" />
<property name="outgoingBatchService" ref="" outgoingBatchService "" />
<property name="nodeService" ref="nodeService" />
</bean>

Expand Down
7 changes: 7 additions & 0 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -174,6 +174,13 @@
<property name="runtimeConfiguration" ref="runtimeConfiguration" />
<property name="configurationService" ref="configurationService" />
<property name="historyService" ref="outgoingBatchHistoryService" />

<property name="initialLoadStatusSql">
<value>
select status from ${sync.table.prefix}_outgoing_batch where batch_type='IL' and node_id=?
</value>
</property>

<property name="selectEventsToBatchSql">
<value>
select data.transaction_id, data.data_id from ${sync.table.prefix}_data data,
Expand Down

0 comments on commit 5f3ff19

Please sign in to comment.