From 6c7bf9651b16798021df713b6f40686c3c19f8f9 Mon Sep 17 00:00:00 2001 From: erilong Date: Tue, 23 Oct 2007 17:49:25 +0000 Subject: [PATCH] add JMX method to extract a batch to Symmetric Data Format --- .../service/IDataExtractorService.java | 15 ++-- .../service/IOutgoingBatchService.java | 10 ++- .../service/impl/DataExtractorService.java | 88 ++++++++++++------- .../service/impl/OutgoingBatchService.java | 22 +++++ .../jmx/SymmetricManagementService.java | 37 ++++++++ .../src/main/resources/symmetric-jmx.xml | 1 + .../src/main/resources/symmetric-services.xml | 8 ++ 7 files changed, 141 insertions(+), 40 deletions(-) diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java index e8dbddb08b..1e69e9acd8 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java @@ -28,11 +28,9 @@ public interface IDataExtractorService { - public void extractClientIdentityFor(Node client, - IOutgoingTransport transport); + public void extractClientIdentityFor(Node client, IOutgoingTransport transport); - public OutgoingBatch extractInitialLoadFor(Node client, Trigger config, - IOutgoingTransport transport); + public OutgoingBatch extractInitialLoadFor(Node node, Trigger config, IOutgoingTransport transport); public void extractInitialLoadWithinBatchFor(Node node, final Trigger trigger, final IOutgoingTransport transport); @@ -40,10 +38,13 @@ public void extractInitialLoadWithinBatchFor(Node node, final Trigger trigger, /** * @return true if work was done or false if there was no work to do. */ - public boolean extract(Node client, IOutgoingTransport transport) - throws Exception; + public boolean extract(Node node, IOutgoingTransport transport) throws Exception; + + public boolean extract(Node node, final IExtractListener handler) throws Exception; - public boolean extract(Node client, final IExtractListener handler) + public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, String endBatchId) throws Exception; + public boolean extractBatchRange(IExtractListener handler, String startBatchId, String endBatchId) + throws Exception; } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index 494e3c910f..c5c354a547 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -1,7 +1,8 @@ /* * SymmetricDS is an open source database synchronization solution. * - * Copyright (C) Chris Henson + * Copyright (C) Chris Henson , + * Eric Long * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -27,9 +28,16 @@ public interface IOutgoingBatchService { public void insertOutgoingBatch(final OutgoingBatch outgoingBatch); + public void buildOutgoingBatches(String nodeId); + public List getOutgoingBatches(String nodeId); + + public List getOutgoingBatchRange(String startBatchId, String endBatchId); + public void markOutgoingBatchSent(OutgoingBatch batch); + public void setBatchStatus(String batchId, Status status); + public boolean isInitialLoadComplete(String nodeId); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 64a690f506..e8d84a5b39 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -158,42 +158,14 @@ public boolean extract(Node node, final IExtractListener handler) outgoingBatchService.buildOutgoingBatches(node.getNodeId()); - List batches = outgoingBatchService - .getOutgoingBatches(node.getNodeId()); + List batches = outgoingBatchService.getOutgoingBatches(node.getNodeId()); if (batches != null && batches.size() > 0) { try { handler.init(); for (final OutgoingBatch batch : batches) { handler.startBatch(batch); - jdbcTemplate.execute(new ConnectionCallback() { - public Object doInConnection(Connection conn) - throws SQLException, DataAccessException { - PreparedStatement statement = conn - .prepareStatement( - selectEventDataToExtractSql, - java.sql.ResultSet.TYPE_FORWARD_ONLY, - java.sql.ResultSet.CONCUR_READ_ONLY); - statement.setFetchSize(dbDialect - .getStreamingResultsFetchSize()); - statement.setString(1, batch.getNodeId()); - statement.setString(2, batch.getBatchId()); - ResultSet results = statement.executeQuery(); - while (results.next()) { - try { - handler.dataExtracted(next(results)); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - JdbcUtils.closeResultSet(results); - JdbcUtils.closeStatement(statement); - return null; - } - }); - + selectEventDataToExtract(handler, batch); handler.endBatch(batch); // At this point, we've already sent the data to the client, so if @@ -207,9 +179,61 @@ public Object doInConnection(Connection conn) handler.done(); } return true; - } else { - return false; } + return false; + } + + public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, + String endBatchId) throws Exception { + + ExtractStreamHandler handler = new ExtractStreamHandler(transport); + return extractBatchRange(handler, startBatchId, endBatchId); + } + + public boolean extractBatchRange(final IExtractListener handler, String startBatchId, + String endBatchId) throws Exception { + + List batches = outgoingBatchService.getOutgoingBatchRange(startBatchId, endBatchId); + + if (batches != null && batches.size() > 0) { + try { + handler.init(); + for (final OutgoingBatch batch : batches) { + handler.startBatch(batch); + selectEventDataToExtract(handler, batch); + handler.endBatch(batch); + } + } finally { + handler.done(); + } + return true; + } + return false; + } + + private void selectEventDataToExtract(final IExtractListener handler, final OutgoingBatch batch) { + jdbcTemplate.execute(new ConnectionCallback() { + public Object doInConnection(Connection conn) throws SQLException, DataAccessException { + PreparedStatement ps = conn.prepareStatement(selectEventDataToExtractSql, + ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + ps.setFetchSize(dbDialect.getStreamingResultsFetchSize()); + ps.setString(1, batch.getNodeId()); + ps.setString(2, batch.getBatchId()); + ResultSet rs = ps.executeQuery(); + while (rs.next()) { + try { + handler.dataExtracted(next(rs)); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + JdbcUtils.closeResultSet(rs); + JdbcUtils.closeStatement(ps); + return null; + } + }); } private Data next(ResultSet results) throws SQLException { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index 54ec945439..4b0de6c85d 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -55,6 +55,8 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa private String selectOutgoingBatchSql; + private String selectOutgoingBatchRangeSql; + private String changeBatchStatusSql; private String initialLoadStatusSql; @@ -209,6 +211,22 @@ public Object mapRow(ResultSet rs, int index) throws SQLException { }); } + @SuppressWarnings("unchecked") + public List getOutgoingBatchRange(String startBatchId, String endBatchId) { + return (List) outgoingBatchQueryTemplate.query(selectOutgoingBatchRangeSql, + new Object[] { startBatchId, endBatchId }, new RowMapper() { + public Object mapRow(ResultSet rs, int index) throws SQLException { + OutgoingBatch batch = new OutgoingBatch(); + batch.setBatchId(rs.getString(1)); + batch.setNodeId(rs.getString(2)); + batch.setChannelId(rs.getString(3)); + batch.setStatus(rs.getString(4)); + batch.setBatchType(rs.getString(5)); + return batch; + } + }); + } + public void markOutgoingBatchSent(OutgoingBatch batch) { setBatchStatus(batch.getBatchId(), Status.SE); } @@ -283,4 +301,8 @@ public void setOutgoingBatchQueryTemplate(JdbcTemplate outgoingBatchQueryTemplat this.outgoingBatchQueryTemplate = outgoingBatchQueryTemplate; } + public void setSelectOutgoingBatchRangeSql(String selectOutgoingBatchRangeSql) { + this.selectOutgoingBatchRangeSql = selectOutgoingBatchRangeSql; + } + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/jmx/SymmetricManagementService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/jmx/SymmetricManagementService.java index 86c6594653..0051a22a9b 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/jmx/SymmetricManagementService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/jmx/SymmetricManagementService.java @@ -21,6 +21,8 @@ package org.jumpmind.symmetric.service.jmx; +import java.io.ByteArrayOutputStream; +import java.io.FileOutputStream; import java.util.Properties; import javax.sql.DataSource; @@ -29,11 +31,14 @@ import org.jumpmind.symmetric.config.IRuntimeConfig; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.service.IBootstrapService; +import org.jumpmind.symmetric.service.IDataExtractorService; import org.jumpmind.symmetric.service.IDataService; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IOutgoingBatchService; import org.jumpmind.symmetric.service.IPurgeService; import org.jumpmind.symmetric.service.IRegistrationService; +import org.jumpmind.symmetric.transport.IOutgoingTransport; +import org.jumpmind.symmetric.transport.internal.InternalOutgoingTransport; import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.jmx.export.annotation.ManagedOperation; import org.springframework.jmx.export.annotation.ManagedOperationParameter; @@ -56,6 +61,8 @@ public class SymmetricManagementService { private IOutgoingBatchService outgoingBatchService; private IRegistrationService registrationService; + + private IDataExtractorService dataExtractorService; private Properties properties; @@ -159,6 +166,31 @@ public String reloadNode(String nodeId) { return dataService.reloadNode(nodeId); } + @ManagedOperation(description = "Show a batch in Symmetric Data Format.") + @ManagedOperationParameters( { @ManagedOperationParameter(name = "batchId", description = "The batch ID to display") }) + public String showBatch(String batchId) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + IOutgoingTransport transport = new InternalOutgoingTransport(out); + dataExtractorService.extractBatchRange(transport, batchId, batchId); + transport.close(); + out.close(); + return out.toString(); + } + + @ManagedOperation(description = "Write a range of batches to a file in Symmetric Data Format.") + @ManagedOperationParameters( { + @ManagedOperationParameter(name = "startBatchId", description = "Starting batch ID of range"), + @ManagedOperationParameter(name = "endBatchId", description = "Ending batch ID of range"), + @ManagedOperationParameter(name = "fileName", description = "File name to write batches") }) + public void writeBatchRangeToFile(String startBatchId, String endBatchId, String fileName) + throws Exception { + FileOutputStream out = new FileOutputStream(fileName); + IOutgoingTransport transport = new InternalOutgoingTransport(out); + dataExtractorService.extractBatchRange(transport, startBatchId, endBatchId); + transport.close(); + out.close(); + } + public void setRuntimeConfiguration(IRuntimeConfig runtimeConfiguration) { this.runtimeConfiguration = runtimeConfiguration; } @@ -194,4 +226,9 @@ public void setRegistrationService(IRegistrationService registrationService) { public void setOutgoingBatchService(IOutgoingBatchService outgoingBatchService) { this.outgoingBatchService = outgoingBatchService; } + + public void setDataExtractorService(IDataExtractorService dataExtractorService) { + this.dataExtractorService = dataExtractorService; + } + } diff --git a/symmetric/src/main/resources/symmetric-jmx.xml b/symmetric/src/main/resources/symmetric-jmx.xml index 72eec998e6..ab0be1634f 100644 --- a/symmetric/src/main/resources/symmetric-jmx.xml +++ b/symmetric/src/main/resources/symmetric-jmx.xml @@ -41,6 +41,7 @@ + \ No newline at end of file diff --git a/symmetric/src/main/resources/symmetric-services.xml b/symmetric/src/main/resources/symmetric-services.xml index 21a52a3722..bde688bc99 100644 --- a/symmetric/src/main/resources/symmetric-services.xml +++ b/symmetric/src/main/resources/symmetric-services.xml @@ -224,6 +224,14 @@ from ${sync.table.prefix}_outgoing_batch b where b.node_id = ? and b.status = 'ER' order by 6, 1 + + + select b.batch_id, b.node_id, b.channel_id, b.status, b.batch_type + from ${sync.table.prefix}_outgoing_batch b + where b.batch_id between ? and ? + order by b.batch_id + + update ${sync.table.prefix}_outgoing_batch set status=? where batch_id=?