Skip to content

Commit

Permalink
add JMX method to extract a batch to Symmetric Data Format
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 23, 2007
1 parent 4214c98 commit 6c7bf96
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 40 deletions.
Expand Up @@ -28,22 +28,23 @@

public interface IDataExtractorService {

public void extractClientIdentityFor(Node client,
IOutgoingTransport transport);
public void extractClientIdentityFor(Node client, IOutgoingTransport transport);

public OutgoingBatch extractInitialLoadFor(Node client, Trigger config,
IOutgoingTransport transport);
public OutgoingBatch extractInitialLoadFor(Node node, Trigger config, IOutgoingTransport transport);

public void extractInitialLoadWithinBatchFor(Node node, final Trigger trigger,
final IOutgoingTransport transport);

/**
* @return true if work was done or false if there was no work to do.
*/
public boolean extract(Node client, IOutgoingTransport transport)
throws Exception;
public boolean extract(Node node, IOutgoingTransport transport) throws Exception;

public boolean extract(Node node, final IExtractListener handler) throws Exception;

public boolean extract(Node client, final IExtractListener handler)
public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, String endBatchId)
throws Exception;

public boolean extractBatchRange(IExtractListener handler, String startBatchId, String endBatchId)
throws Exception;
}
@@ -1,7 +1,8 @@
/*
* SymmetricDS is an open source database synchronization solution.
*
* Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
* Copyright (C) Chris Henson <chenson42@users.sourceforge.net>,
* Eric Long <erilong@users.sourceforge.net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
Expand All @@ -27,9 +28,16 @@

public interface IOutgoingBatchService {
public void insertOutgoingBatch(final OutgoingBatch outgoingBatch);

public void buildOutgoingBatches(String nodeId);

public List<OutgoingBatch> getOutgoingBatches(String nodeId);

public List<OutgoingBatch> getOutgoingBatchRange(String startBatchId, String endBatchId);

public void markOutgoingBatchSent(OutgoingBatch batch);

public void setBatchStatus(String batchId, Status status);

public boolean isInitialLoadComplete(String nodeId);
}
Expand Up @@ -158,42 +158,14 @@ public boolean extract(Node node, final IExtractListener handler)

outgoingBatchService.buildOutgoingBatches(node.getNodeId());

List<OutgoingBatch> batches = outgoingBatchService
.getOutgoingBatches(node.getNodeId());
List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatches(node.getNodeId());

if (batches != null && batches.size() > 0) {
try {
handler.init();
for (final OutgoingBatch batch : batches) {
handler.startBatch(batch);
jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection conn)
throws SQLException, DataAccessException {
PreparedStatement statement = conn
.prepareStatement(
selectEventDataToExtractSql,
java.sql.ResultSet.TYPE_FORWARD_ONLY,
java.sql.ResultSet.CONCUR_READ_ONLY);
statement.setFetchSize(dbDialect
.getStreamingResultsFetchSize());
statement.setString(1, batch.getNodeId());
statement.setString(2, batch.getBatchId());
ResultSet results = statement.executeQuery();
while (results.next()) {
try {
handler.dataExtracted(next(results));
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
JdbcUtils.closeResultSet(results);
JdbcUtils.closeStatement(statement);
return null;
}
});

selectEventDataToExtract(handler, batch);
handler.endBatch(batch);

// At this point, we've already sent the data to the client, so if
Expand All @@ -207,9 +179,61 @@ public Object doInConnection(Connection conn)
handler.done();
}
return true;
} else {
return false;
}
return false;
}

public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId,
String endBatchId) throws Exception {

ExtractStreamHandler handler = new ExtractStreamHandler(transport);
return extractBatchRange(handler, startBatchId, endBatchId);
}

public boolean extractBatchRange(final IExtractListener handler, String startBatchId,
String endBatchId) throws Exception {

List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatchRange(startBatchId, endBatchId);

if (batches != null && batches.size() > 0) {
try {
handler.init();
for (final OutgoingBatch batch : batches) {
handler.startBatch(batch);
selectEventDataToExtract(handler, batch);
handler.endBatch(batch);
}
} finally {
handler.done();
}
return true;
}
return false;
}

private void selectEventDataToExtract(final IExtractListener handler, final OutgoingBatch batch) {
jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
PreparedStatement ps = conn.prepareStatement(selectEventDataToExtractSql,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(dbDialect.getStreamingResultsFetchSize());
ps.setString(1, batch.getNodeId());
ps.setString(2, batch.getBatchId());
ResultSet rs = ps.executeQuery();
while (rs.next()) {
try {
handler.dataExtracted(next(rs));
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
JdbcUtils.closeResultSet(rs);
JdbcUtils.closeStatement(ps);
return null;
}
});
}

private Data next(ResultSet results) throws SQLException {
Expand Down
Expand Up @@ -55,6 +55,8 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa

private String selectOutgoingBatchSql;

private String selectOutgoingBatchRangeSql;

private String changeBatchStatusSql;

private String initialLoadStatusSql;
Expand Down Expand Up @@ -209,6 +211,22 @@ public Object mapRow(ResultSet rs, int index) throws SQLException {
});
}

@SuppressWarnings("unchecked")
public List<OutgoingBatch> getOutgoingBatchRange(String startBatchId, String endBatchId) {
return (List<OutgoingBatch>) outgoingBatchQueryTemplate.query(selectOutgoingBatchRangeSql,
new Object[] { startBatchId, endBatchId }, new RowMapper() {
public Object mapRow(ResultSet rs, int index) throws SQLException {
OutgoingBatch batch = new OutgoingBatch();
batch.setBatchId(rs.getString(1));
batch.setNodeId(rs.getString(2));
batch.setChannelId(rs.getString(3));
batch.setStatus(rs.getString(4));
batch.setBatchType(rs.getString(5));
return batch;
}
});
}

public void markOutgoingBatchSent(OutgoingBatch batch) {
setBatchStatus(batch.getBatchId(), Status.SE);
}
Expand Down Expand Up @@ -283,4 +301,8 @@ public void setOutgoingBatchQueryTemplate(JdbcTemplate outgoingBatchQueryTemplat
this.outgoingBatchQueryTemplate = outgoingBatchQueryTemplate;
}

public void setSelectOutgoingBatchRangeSql(String selectOutgoingBatchRangeSql) {
this.selectOutgoingBatchRangeSql = selectOutgoingBatchRangeSql;
}

}
Expand Up @@ -21,6 +21,8 @@

package org.jumpmind.symmetric.service.jmx;

import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
import java.util.Properties;

import javax.sql.DataSource;
Expand All @@ -29,11 +31,14 @@
import org.jumpmind.symmetric.config.IRuntimeConfig;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IBootstrapService;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.service.IPurgeService;
import org.jumpmind.symmetric.service.IRegistrationService;
import org.jumpmind.symmetric.transport.IOutgoingTransport;
import org.jumpmind.symmetric.transport.internal.InternalOutgoingTransport;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedOperationParameter;
Expand All @@ -56,6 +61,8 @@ public class SymmetricManagementService {
private IOutgoingBatchService outgoingBatchService;

private IRegistrationService registrationService;

private IDataExtractorService dataExtractorService;

private Properties properties;

Expand Down Expand Up @@ -159,6 +166,31 @@ public String reloadNode(String nodeId) {
return dataService.reloadNode(nodeId);
}

@ManagedOperation(description = "Show a batch in Symmetric Data Format.")
@ManagedOperationParameters( { @ManagedOperationParameter(name = "batchId", description = "The batch ID to display") })
public String showBatch(String batchId) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
IOutgoingTransport transport = new InternalOutgoingTransport(out);
dataExtractorService.extractBatchRange(transport, batchId, batchId);
transport.close();
out.close();
return out.toString();
}

@ManagedOperation(description = "Write a range of batches to a file in Symmetric Data Format.")
@ManagedOperationParameters( {
@ManagedOperationParameter(name = "startBatchId", description = "Starting batch ID of range"),
@ManagedOperationParameter(name = "endBatchId", description = "Ending batch ID of range"),
@ManagedOperationParameter(name = "fileName", description = "File name to write batches") })
public void writeBatchRangeToFile(String startBatchId, String endBatchId, String fileName)
throws Exception {
FileOutputStream out = new FileOutputStream(fileName);
IOutgoingTransport transport = new InternalOutgoingTransport(out);
dataExtractorService.extractBatchRange(transport, startBatchId, endBatchId);
transport.close();
out.close();
}

public void setRuntimeConfiguration(IRuntimeConfig runtimeConfiguration) {
this.runtimeConfiguration = runtimeConfiguration;
}
Expand Down Expand Up @@ -194,4 +226,9 @@ public void setRegistrationService(IRegistrationService registrationService) {
public void setOutgoingBatchService(IOutgoingBatchService outgoingBatchService) {
this.outgoingBatchService = outgoingBatchService;
}

public void setDataExtractorService(IDataExtractorService dataExtractorService) {
this.dataExtractorService = dataExtractorService;
}

}
1 change: 1 addition & 0 deletions symmetric/src/main/resources/symmetric-jmx.xml
Expand Up @@ -41,6 +41,7 @@
<property name="dataSource" ref="dataSource" />
<property name="outgoingBatchService" ref="outgoingBatchService" />
<property name="nodeService" ref="nodeService" />
<property name="dataExtractorService" ref="dataExtractorService" />
</bean>

</beans>
8 changes: 8 additions & 0 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -224,6 +224,14 @@
from ${sync.table.prefix}_outgoing_batch b where b.node_id = ? and b.status = 'ER' order by 6, 1
</value>
</property>
<property name="selectOutgoingBatchRangeSql">
<value>
select b.batch_id, b.node_id, b.channel_id, b.status, b.batch_type
from ${sync.table.prefix}_outgoing_batch b
where b.batch_id between ? and ?
order by b.batch_id
</value>
</property>
<property name="changeBatchStatusSql">
<value>update ${sync.table.prefix}_outgoing_batch set status=? where batch_id=?</value>
</property>
Expand Down

0 comments on commit 6c7bf96

Please sign in to comment.