Skip to content

Commit

Permalink
0001259: Add JMX method to extract batches to a file for a node and t…
Browse files Browse the repository at this point in the history
…ime and channel ranges
  • Loading branch information
chenson42 committed Jun 6, 2013
1 parent 249a595 commit 1c43440
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 5 deletions.
Expand Up @@ -20,6 +20,7 @@
*/
package org.jumpmind.symmetric.service.jmx;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.Writer;
Expand All @@ -39,7 +40,9 @@
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.transport.ConcurrentConnectionManager.NodeConnectionStatistics;
import org.jumpmind.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jmx.export.annotation.ManagedAttribute;
Expand Down Expand Up @@ -274,6 +277,35 @@ public boolean setSyncEnabledForNode(String nodeId, boolean syncEnabled) {
return false;
}
}

@ManagedOperation(description = "Extract multiple batches to a file for a time range")
@ManagedOperationParameters({
@ManagedOperationParameter(name = "fileName", description = "The file to write the batch output to"),
@ManagedOperationParameter(name = "nodeId", description = "The target node id whose batches need extracted"),
@ManagedOperationParameter(name = "startTime", description = "The start time range to extract. The format is yyyy-MM-dd hh:mm"),
@ManagedOperationParameter(name = "endTime", description = "The start time range to extract. The format is yyyy-MM-dd hh:mm"),
@ManagedOperationParameter(name = "channelIdList", description = "A comma separated list of channels to extract") })
public boolean extractBatcheRange(String fileName, String nodeId, String startTime,
String endTime, String channelIdList) {
File file = new File(fileName);
file.getParentFile().mkdirs();
Date startBatchTime = FormatUtils.parseDate(startTime, FormatUtils.TIMESTAMP_PATTERNS);
Date endBatchTime = FormatUtils.parseDate(endTime, FormatUtils.TIMESTAMP_PATTERNS);
String[] channelIds = channelIdList.split(",");
IDataExtractorService dataExtractorService = engine.getDataExtractorService();
BufferedWriter writer = null;
try {
writer = new BufferedWriter(new FileWriter(file));
dataExtractorService.extractBatchRange(writer, nodeId, startBatchTime, endBatchTime,
channelIds);
return true;
} catch (Exception ex) {
log.error("Failed to write batch range to file", ex);
return false;
} finally {
IOUtils.closeQuietly(writer);
}
}

@ManagedOperation(description = "Enable or disable a channel for a specific external id")
@ManagedOperationParameters({
Expand Down
Expand Up @@ -22,6 +22,7 @@

import java.io.OutputStream;
import java.io.Writer;
import java.util.Date;
import java.util.List;

import org.jumpmind.symmetric.io.data.IDataWriter;
Expand All @@ -45,7 +46,10 @@ public interface IDataExtractorService {
*/
public List<OutgoingBatch> extract(ProcessInfo processInfo, Node node, IOutgoingTransport transport);

public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId, long endBatchId);
public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId, long endBatchId);

public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTime,
Date endBatchTime, String... channelIds);

public OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targetNode,
IDataWriter dataWriter, OutgoingBatch currentBatch,
Expand Down
Expand Up @@ -21,6 +21,7 @@

package org.jumpmind.symmetric.service;

import java.util.Date;
import java.util.List;

import org.jumpmind.db.sql.ISqlTransaction;
Expand All @@ -42,7 +43,9 @@ public interface IOutgoingBatchService {

public OutgoingBatches getOutgoingBatches(String nodeId, boolean includeDisabledChannels);

public OutgoingBatches getOutgoingBatchRange(String startBatchId, String endBatchId);
public OutgoingBatches getOutgoingBatchRange(long startBatchId, long endBatchId);

public OutgoingBatches getOutgoingBatchRange(String nodeId, Date startDate, Date endDate, String... channels);

public OutgoingBatches getOutgoingBatchErrors(int maxRows);

Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -667,6 +668,35 @@ public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId
}
return foundBatch;
}

public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTime,
Date endBatchTime, String... channelIds) {
boolean foundBatch = false;
Node sourceNode = nodeService.findIdentity();
OutgoingBatches batches = outgoingBatchService.getOutgoingBatchRange(nodeId, startBatchTime, endBatchTime, channelIds);
List<OutgoingBatch> list = batches.getBatches();
for (OutgoingBatch outgoingBatch : list) {
Node targetNode = nodeService.findNode(nodeId);
if (targetNode == null && Constants.UNROUTED_NODE_ID.equals(nodeId)) {
targetNode = new Node();
targetNode.setNodeId("-1");
}
if (targetNode != null) {
IDataReader dataReader = new ExtractDataReader(symmetricDialect.getPlatform(),
new SelectFromSymDataSource(outgoingBatch, sourceNode, targetNode));
DataContext ctx = new DataContext();
ctx.put(Constants.DATA_CONTEXT_TARGET_NODE, targetNode);
ctx.put(Constants.DATA_CONTEXT_SOURCE_NODE, nodeService.findIdentity());
new DataProcessor(dataReader, createTransformDataWriter(
nodeService.findIdentity(), targetNode,
new ProtocolDataWriter(nodeService.findIdentityNodeId(), writer,
targetNode.requires13Compatiblity()))).process(ctx);
foundBatch = true;
}
}
return foundBatch;
}


protected TransformWriter createTransformDataWriter(Node identity, Node targetNode,
IDataWriter extractWriter) {
Expand Down
Expand Up @@ -371,13 +371,25 @@ public boolean inTimeWindow(List<NodeGroupChannelWindow> windows, String targetN
}

}

public OutgoingBatches getOutgoingBatchRange(String nodeId, Date startDate, Date endDate, String... channels) {
OutgoingBatches batches = new OutgoingBatches();
List<OutgoingBatch> batchList = new ArrayList<OutgoingBatch>();
for (String channel : channels) {
batchList.addAll(sqlTemplate.query(
getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchTimeRangeSql"),
new OutgoingBatchMapper(true, false), nodeId, channel, startDate, endDate));
}
batches.setBatches(batchList);
return batches;
}

public OutgoingBatches getOutgoingBatchRange(String startBatchId, String endBatchId) {
public OutgoingBatches getOutgoingBatchRange(long startBatchId, long endBatchId) {
OutgoingBatches batches = new OutgoingBatches();
batches.setBatches(sqlTemplate.query(
getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchRangeSql"),
new OutgoingBatchMapper(true, false), Long.parseLong(startBatchId),
Long.parseLong(endBatchId)));
new OutgoingBatchMapper(true, false), startBatchId,
endBatchId));
return batches;
}

Expand Down
Expand Up @@ -60,6 +60,9 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, Map<String, String

putSql("selectOutgoingBatchRangeSql" ,"" +
"where batch_id between ? and ? order by batch_id " );

putSql("selectOutgoingBatchTimeRangeSql" ,"" +
"where node_id=? and channel_id=? and create_time >= ? and create_time <= ? " );

putSql("selectOutgoingBatchPrefixSql" ,"" +
"select node_id, channel_id, status, " +
Expand Down

0 comments on commit 1c43440

Please sign in to comment.