Skip to content

Commit

Permalink
Created OutgoingBatches object as a container for batches and channel…
Browse files Browse the repository at this point in the history
…s (channels are currently unused).
  • Loading branch information
mhanes committed Sep 29, 2009
1 parent 8ecc718 commit a3e9ce5
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 88 deletions.
@@ -0,0 +1,29 @@
package org.jumpmind.symmetric.model;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;

public class OutgoingBatches {

List<OutgoingBatch> batches = new ArrayList<OutgoingBatch>();
Set<Channel> channels = new TreeSet<Channel>();

public List<OutgoingBatch> getBatches() {
return batches;
}

public void setBatches(List<OutgoingBatch> batches) {
this.batches = batches;
}

public Set<Channel> getChannels() {
return channels;
}

public void setChannels(Set<Channel> channels) {
this.channels = channels;
}

}
Expand Up @@ -21,9 +21,8 @@

package org.jumpmind.symmetric.service;

import java.util.List;

import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.springframework.jdbc.core.JdbcTemplate;

public interface IOutgoingBatchService {
Expand All @@ -32,11 +31,11 @@ public interface IOutgoingBatchService {

public OutgoingBatch findOutgoingBatch(long batchId);

public List<OutgoingBatch> getOutgoingBatches(String nodeId);
public OutgoingBatches getOutgoingBatches(String nodeId);

public List<OutgoingBatch> getOutgoingBatchRange(String startBatchId, String endBatchId);
public OutgoingBatches getOutgoingBatchRange(String startBatchId, String endBatchId);

public List<OutgoingBatch> getOutgoingBatchErrors(int maxRows);
public OutgoingBatches getOutgoingBatchErrors(int maxRows);

public boolean isInitialLoadComplete(String nodeId);

Expand Down
Expand Up @@ -52,6 +52,7 @@
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.route.SimpleRouterContext;
Expand Down Expand Up @@ -295,8 +296,8 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws IOE
routingService.routeData();
}

List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatches(node.getNodeId());
if (batches != null && batches.size() > 0) {
OutgoingBatches batches = outgoingBatchService.getOutgoingBatches(node.getNodeId());
if (batches != null && batches.getBatches() != null && batches.getBatches().size() > 0) {

ChannelMap suspendIgnoreChannels = targetTransport.getSuspendIgnoreChannelLists(this.configurationService);

Expand All @@ -315,14 +316,14 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws IOE

// Search for suspended or ignores, removing both but keeping track
// of ignores for further updates.
for (OutgoingBatch batch : batches) {
for (OutgoingBatch batch : batches.getBatches()) {
if (ignoredChannels.contains(batch.getChannelId())) {
ignoredBatches.add(batch);
} else if (suspendedChannels.contains(batch.getChannelId())) {
suspendBatches.add(batch);
}
}
batches.removeAll(ignoredBatches);
batches.getBatches().removeAll(ignoredBatches);

FileOutgoingTransport fileTransport = null;

Expand All @@ -335,7 +336,7 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws IOE
ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor,
fileTransport != null ? fileTransport : targetTransport);

databaseExtract(node, batches, handler);
databaseExtract(node, batches.getBatches(), handler);

networkTransfer(fileTransport, targetTransport);

Expand Down Expand Up @@ -441,12 +442,12 @@ private boolean areNumeric(String... data) {
public boolean extractBatchRange(final IExtractListener handler, String startBatchId, String endBatchId)
throws IOException {
if (areNumeric(startBatchId, endBatchId)) {
List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatchRange(startBatchId, endBatchId);
OutgoingBatches batches = outgoingBatchService.getOutgoingBatchRange(startBatchId, endBatchId);

if (batches != null && batches.size() > 0) {
if (batches != null && batches.getBatches() != null && batches.getBatches().size() > 0) {
try {
handler.init();
for (final OutgoingBatch batch : batches) {
for (final OutgoingBatch batch : batches.getBatches()) {
handler.startBatch(batch);
selectEventDataToExtract(handler, batch);
handler.endBatch(batch);
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.jumpmind.symmetric.model.NodeGroupChannelWindow;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.model.OutgoingBatch.Status;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.INodeService;
Expand All @@ -61,14 +62,14 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa

@Transactional
public void markAllAsSentForNode(String nodeId) {
List<OutgoingBatch> batches = null;
OutgoingBatches batches = null;
do {
batches = getOutgoingBatches(nodeId);
for (OutgoingBatch outgoingBatch : batches) {
for (OutgoingBatch outgoingBatch : batches.getBatches()) {
outgoingBatch.setStatus(Status.OK);
updateOutgoingBatch(outgoingBatch);
}
} while (batches.size() > 0);
} while (batches.getBatches().size() > 0);
}

public void updateOutgoingBatch(OutgoingBatch outgoingBatch) {
Expand Down Expand Up @@ -130,7 +131,7 @@ public OutgoingBatch findOutgoingBatch(long batchId) {
* order.
*/
@SuppressWarnings("unchecked")
public List<OutgoingBatch> getOutgoingBatches(String targetNodeId) {
public OutgoingBatches getOutgoingBatches(String targetNodeId) {
List<OutgoingBatch> list = (List<OutgoingBatch>) jdbcTemplate.query(getSql("selectOutgoingBatchSql"),
new Object[] { targetNodeId, OutgoingBatch.Status.NE.toString(), OutgoingBatch.Status.SE.toString(),
OutgoingBatch.Status.ER.toString() }, new OutgoingBatchMapper());
Expand All @@ -156,7 +157,9 @@ public int compare(NodeChannel b1, NodeChannel b2) {
}
});

return filterOutgoingBatchesForChannels(targetNodeId, list, channels);
OutgoingBatches batches = new OutgoingBatches();
batches.setBatches(filterOutgoingBatchesForChannels(targetNodeId, list, channels));
return batches;
}

/**
Expand Down Expand Up @@ -208,15 +211,19 @@ public boolean inTimeWindow(List<NodeGroupChannelWindow> windows, String timezon
}

@SuppressWarnings("unchecked")
public List<OutgoingBatch> getOutgoingBatchRange(String startBatchId, String endBatchId) {
return (List<OutgoingBatch>) jdbcTemplate.query(getSql("selectOutgoingBatchRangeSql"), new Object[] {
startBatchId, endBatchId }, new OutgoingBatchMapper());
public OutgoingBatches getOutgoingBatchRange(String startBatchId, String endBatchId) {
OutgoingBatches batches = new OutgoingBatches();
batches.setBatches(jdbcTemplate.query(getSql("selectOutgoingBatchRangeSql"), new Object[] { startBatchId,
endBatchId }, new OutgoingBatchMapper()));
return batches;
}

@SuppressWarnings("unchecked")
public List<OutgoingBatch> getOutgoingBatchErrors(int maxRows) {
return (List<OutgoingBatch>) jdbcTemplate.query(new MaxRowsStatementCreator(
getSql("selectOutgoingBatchErrorsSql"), maxRows), new OutgoingBatchMapper());
public OutgoingBatches getOutgoingBatchErrors(int maxRows) {
OutgoingBatches batches = new OutgoingBatches();
batches.setBatches(jdbcTemplate.query(new MaxRowsStatementCreator(getSql("selectOutgoingBatchErrorsSql"),
maxRows), new OutgoingBatchMapper()));
return batches;
}

@SuppressWarnings("unchecked")
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.apache.commons.lang.time.FastDateFormat;
import org.jumpmind.symmetric.model.IncomingBatch;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.service.IIncomingBatchService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void write(CharSequence feedURL, Writer outputWriter) throws IOException
+ "/batch/" + batch.getBatchId()));
}

for (OutgoingBatch batch : findOutgoingBatchErrors()) {
for (OutgoingBatch batch : findOutgoingBatchErrors().getBatches()) {
String title = "Outgoing Batch " + batch.getNodeBatchId();
StringBuilder value = new StringBuilder("Node ");
value.append(batch.getNodeId());
Expand Down Expand Up @@ -157,7 +158,7 @@ private List<IncomingBatch> findIncomingBatchErrors() {
return getIncomingBatchService().findIncomingBatchErrors(MAX_ERRORS);
}

private List<OutgoingBatch> findOutgoingBatchErrors() {
private OutgoingBatches findOutgoingBatchErrors() {

return getOutgoingBatchService().getOutgoingBatchErrors(MAX_ERRORS);
}
Expand Down
Expand Up @@ -25,14 +25,13 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;

import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataEventType;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
Expand Down Expand Up @@ -74,8 +73,7 @@ public void testDisabledChannel() {
nodeChannel.setEnabled(false);
getConfigurationService().saveChannel(nodeChannel.getChannel(), true);

cleanSlate("sym_data_event", "sym_data",
"sym_outgoing_batch");
cleanSlate("sym_data_event", "sym_data", "sym_outgoing_batch");
int size = 50; // magic number
int count = 3; // must be <= size
assertTrue(count <= size);
Expand All @@ -85,9 +83,10 @@ public void testDisabledChannel() {
TestConstants.TEST_CLIENT_EXTERNAL_ID);
}

List<OutgoingBatch> list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
OutgoingBatches list = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
assertNotNull(list);
assertEquals(list.size(), 0);
assertNotNull(list.getBatches());
assertEquals(list.getBatches().size(), 0);

nodeChannel.setEnabled(true);
getConfigurationService().saveChannel(nodeChannel, true);
Expand Down

0 comments on commit a3e9ce5

Please sign in to comment.