diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index 42ea1e5653..69cafd813e 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -31,4 +31,5 @@ public interface IOutgoingBatchService { public List getOutgoingBatches(String clientId); public void markOutgoingBatchSent(OutgoingBatch batch); public void setBatchStatus(String batchId, Status status); + public boolean isInitialLoadComplete(String nodeId); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index fdeb96cfbe..7fb17b2825 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -40,8 +40,7 @@ import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.support.JdbcUtils; -public class OutgoingBatchService extends AbstractService implements - IOutgoingBatchService { +public class OutgoingBatchService extends AbstractService implements IOutgoingBatchService { final static Log logger = LogFactory.getLog(OutgoingBatchService.class); @@ -57,6 +56,8 @@ public class OutgoingBatchService extends AbstractService implements private String changeBatchStatusSql; + private String initialLoadStatusSql; + private IOutgoingBatchHistoryService historyService; /** @@ -70,27 +71,22 @@ public class OutgoingBatchService extends AbstractService implements */ public void buildOutgoingBatches(final String nodeId) { // TODO should channels be cached? - final List channels = configurationService.getChannelsFor( - nodeId, true); + final List channels = configurationService.getChannelsFor(nodeId, true); jdbcTemplate.execute(new ConnectionCallback() { - public Object doInConnection(Connection conn) throws SQLException, - DataAccessException { + public Object doInConnection(Connection conn) throws SQLException, DataAccessException { - PreparedStatement update = conn - .prepareStatement(updateBatchedEventsSql); + PreparedStatement update = conn.prepareStatement(updateBatchedEventsSql); update.setQueryTimeout(jdbcTemplate.getQueryTimeout()); for (NodeChannel channel : channels) { if (channel.isSuspended()) { - logger.warn(channel.getId() + " channel for " + nodeId - + " is currently suspended."); + logger.warn(channel.getId() + " channel for " + nodeId + " is currently suspended."); } else if (channel.isEnabled()) { // determine which transactions will be part of this batch on this channel - PreparedStatement select = conn - .prepareStatement(selectEventsToBatchSql); + PreparedStatement select = conn.prepareStatement(selectEventsToBatchSql); select.setQueryTimeout(jdbcTemplate.getQueryTimeout()); @@ -119,9 +115,7 @@ public Object doInConnection(Connection conn) throws SQLException, do { String trxId = results.getString(1); - if (stopOnNextTxIdChange - && (lastTrxId == null || !lastTrxId - .equals(trxId))) { + if (stopOnNextTxIdChange && (lastTrxId == null || !lastTrxId.equals(trxId))) { break; } @@ -147,8 +141,7 @@ public Object doInConnection(Connection conn) throws SQLException, lastTrxId = trxId; } while (results.next()); - historyService.created(new Integer(newBatch - .getBatchId()), count); + historyService.created(new Integer(newBatch.getBatchId()), count); } JdbcUtils.closeResultSet(results); @@ -165,19 +158,16 @@ public Object doInConnection(Connection conn) throws SQLException, public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) { jdbcTemplate.execute(new ConnectionCallback() { - public Object doInConnection(Connection conn) throws SQLException, - DataAccessException { + public Object doInConnection(Connection conn) throws SQLException, DataAccessException { insertOutgoingBatch(conn, outgoingBatch); return null; } }); } - private void insertOutgoingBatch(Connection conn, - OutgoingBatch outgoingBatch) throws SQLException { + private void insertOutgoingBatch(Connection conn, OutgoingBatch outgoingBatch) throws SQLException { // TODO: move generated key retrieval to DbDialect - PreparedStatement insert = conn.prepareStatement(createBatchSql, - new int[] { 1 }); + PreparedStatement insert = conn.prepareStatement(createBatchSql, new int[] { 1 }); insert.setQueryTimeout(jdbcTemplate.getQueryTimeout()); insert.setString(1, outgoingBatch.getNodeId()); insert.setString(2, outgoingBatch.getChannelId()); @@ -196,10 +186,9 @@ private void insertOutgoingBatch(Connection conn, @SuppressWarnings("unchecked") public List getOutgoingBatches(String clientId) { - return (List) jdbcTemplate.query(selectOutgoingBatchSql, - new Object[] { clientId }, new RowMapper() { - public Object mapRow(ResultSet rs, int index) - throws SQLException { + return (List) jdbcTemplate.query(selectOutgoingBatchSql, new Object[] { clientId }, + new RowMapper() { + public Object mapRow(ResultSet rs, int index) throws SQLException { OutgoingBatch batch = new OutgoingBatch(); batch.setBatchId(rs.getString(1)); batch.setNodeId(rs.getString(2)); @@ -216,8 +205,7 @@ public void markOutgoingBatchSent(OutgoingBatch batch) { } public void setBatchStatus(String batchId, Status status) { - jdbcTemplate.update(changeBatchStatusSql, new Object[] { status.name(), - batchId }); + jdbcTemplate.update(changeBatchStatusSql, new Object[] { status.name(), batchId }); if (status == Status.SE) { historyService.sent(new Integer(batchId)); @@ -229,8 +217,21 @@ public void setBatchStatus(String batchId, Status status) { } - public void setConfigurationService( - IConfigurationService configurationService) { + public boolean isInitialLoadComplete(String nodeId) { + String status = (String) jdbcTemplate.queryForObject(initialLoadStatusSql, new Object[] { nodeId }, + String.class); + if (status == null) { + throw new RuntimeException("The initial load has not been started for " + nodeId); + } else if (Status.ER.equals(status)) { + throw new RuntimeException("The initial load errored out for " + nodeId); + } else if (Status.OK.equals(status)) { + return true; + } else { + return false; + } + } + + public void setConfigurationService(IConfigurationService configurationService) { this.configurationService = configurationService; } @@ -258,4 +259,8 @@ public void setHistoryService(IOutgoingBatchHistoryService historyService) { this.historyService = historyService; } + public void setInitialLoadStatusSql(String initialLoadStatusSql) { + this.initialLoadStatusSql = initialLoadStatusSql; + } + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/jmx/SymmetricManagementService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/jmx/SymmetricManagementService.java index 7070810862..0227e2e131 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/jmx/SymmetricManagementService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/jmx/SymmetricManagementService.java @@ -33,6 +33,7 @@ import org.jumpmind.symmetric.service.IBootstrapService; import org.jumpmind.symmetric.service.IDataService; import org.jumpmind.symmetric.service.INodeService; +import org.jumpmind.symmetric.service.IOutgoingBatchService; import org.jumpmind.symmetric.service.IPurgeService; import org.jumpmind.symmetric.service.IRegistrationService; import org.springframework.jmx.export.annotation.ManagedAttribute; @@ -53,6 +54,8 @@ public class SymmetricManagementService { private INodeService nodeService; private IDataService dataService; + + private IOutgoingBatchService outgoingBatchService; private IRegistrationService registrationService; @@ -107,6 +110,12 @@ public boolean isExternalIdRegistered(String externalId) { return nodeService.isExternalIdRegistered(externalId); } + @ManagedOperation(description = "Check to see if the initial load for a node id is complete. This method will throw an exception if the load error'd out or was never started.") + @ManagedOperationParameters( { @ManagedOperationParameter(name = "nodeId", description = "The node id") }) + public boolean isInitialLoadComplete(String nodeId) { + return outgoingBatchService.isInitialLoadComplete(nodeId); + } + @ManagedOperation(description = "Enable or disable a channel for a specific external id") @ManagedOperationParameters( { @ManagedOperationParameter(name = "ignore", description = "Set to true to enable and false to disable"), @@ -166,4 +175,8 @@ public void setNodeService(INodeService nodeService) { public void setRegistrationService(IRegistrationService registrationService) { this.registrationService = registrationService; } + + public void setOutgoingBatchService(IOutgoingBatchService outgoingBatchService) { + this.outgoingBatchService = outgoingBatchService; + } } diff --git a/symmetric/src/main/resources/symmetric-jmx.xml b/symmetric/src/main/resources/symmetric-jmx.xml index c2969941fb..03c75c52ee 100644 --- a/symmetric/src/main/resources/symmetric-jmx.xml +++ b/symmetric/src/main/resources/symmetric-jmx.xml @@ -39,6 +39,7 @@ + diff --git a/symmetric/src/main/resources/symmetric-services.xml b/symmetric/src/main/resources/symmetric-services.xml index 9202c16ea0..68972f5b6d 100644 --- a/symmetric/src/main/resources/symmetric-services.xml +++ b/symmetric/src/main/resources/symmetric-services.xml @@ -174,6 +174,13 @@ + + + + select status from ${sync.table.prefix}_outgoing_batch where batch_type='IL' and node_id=? + + + select data.transaction_id, data.data_id from ${sync.table.prefix}_data data,