Skip to content

Commit

Permalink
OutgoingBatches now has the filtering logic. Also, sym_channel now sp…
Browse files Browse the repository at this point in the history
…ecifies an extract interval and sym_node_channel_control has updates for the last time it ran.
  • Loading branch information
mhanes committed Sep 29, 2009
1 parent 285fcf4 commit 6f4327f
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 76 deletions.
Expand Up @@ -42,6 +42,8 @@ public class Channel {

private String batchAlgorithm = "default";

private long extractPeriodMillis = 0;

public Channel() {
}

Expand All @@ -50,11 +52,13 @@ public Channel(String id, int processingOrder) {
this.processingOrder = processingOrder;
}

public Channel(String id, int processingOrder, int maxBatchSize, int maxBatchToSend, boolean enabled) {
public Channel(String id, int processingOrder, int maxBatchSize, int maxBatchToSend, boolean enabled,
long extractPeriodMillis) {
this(id, processingOrder);
this.maxBatchSize = maxBatchSize;
this.maxBatchToSend = maxBatchToSend;
this.enabled = enabled;
this.extractPeriodMillis = extractPeriodMillis;
}

public String getId() {
Expand Down Expand Up @@ -121,4 +125,13 @@ public void setBatchAlgorithm(String batchAlgorithm) {
public String getBatchAlgorithm() {
return batchAlgorithm;
}

public long getExtractPeriodMillis() {
return extractPeriodMillis;
}

public void setExtractPeriodMillis(long extractPeriodMillis) {
this.extractPeriodMillis = extractPeriodMillis;
}

}
Expand Up @@ -127,4 +127,12 @@ public NodeChannelControl getNodeChannelControl() {
return nodeChannelControl;
}

public long getExtractPeriodMillis() {
return channel.getExtractPeriodMillis();
}

public void setExtractPeriodMillis(long extractPeriodMillis) {
channel.setExtractPeriodMillis(extractPeriodMillis);
}

}
@@ -1,14 +1,44 @@
package org.jumpmind.symmetric.model;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;

public class OutgoingBatches {

List<OutgoingBatch> batches = new ArrayList<OutgoingBatch>();
Set<Channel> channels = new TreeSet<Channel>();
Set<NodeChannel> activeChannels = new HashSet<NodeChannel>();
Set<String> activeChannelIds = new HashSet<String>();

public OutgoingBatches(List<OutgoingBatch> batches) {
this.batches = batches;
}

public OutgoingBatches() {

}

public Set<NodeChannel> getActiveChannels() {
return activeChannels;
}

public void addActiveChannel(NodeChannel nodeChannel) {
activeChannels.add(nodeChannel);
activeChannelIds.add(nodeChannel.getId());
}

public Set<String> getActiveChannelId() {
return activeChannelIds;
}

public void setActiveChannels(Set<NodeChannel> activeChannels) {
this.activeChannels = activeChannels;
activeChannelIds = new HashSet<String>();
for (NodeChannel nodeChannel : activeChannels) {
activeChannelIds.add(nodeChannel.getId());
}
}

public List<OutgoingBatch> getBatches() {
return batches;
Expand All @@ -18,12 +48,120 @@ public void setBatches(List<OutgoingBatch> batches) {
this.batches = batches;
}

public Set<Channel> getChannels() {
return channels;
/**
* Removes all batches associated with the provided channel from this
* object.
*
* @param channel
* - channel for which corresponding batches are removed
* @return A list of the batches removed
*/
public List<OutgoingBatch> filterBatchesForChannel(Channel channel) {
List<OutgoingBatch> filtered = getBatchesForChannel(channel);
batches.removeAll(filtered);
return filtered;
}

public List<OutgoingBatch> filterBatchesForChannel(String channelId) {
List<OutgoingBatch> filtered = getBatchesForChannel(channelId);
batches.removeAll(filtered);
return filtered;
}

public List<OutgoingBatch> filterBatchesForChannels(Set<String> channels) {
List<OutgoingBatch> filtered = getBatchesForChannels(channels);
batches.removeAll(filtered);
return filtered;
}

public List<OutgoingBatch> getBatchesForChannel(Channel channel) {
List<OutgoingBatch> batchList = new ArrayList<OutgoingBatch>();
if (channel != null) {
batchList = getBatchesForChannel(channel.getId());
}
return batchList;
}

public List<OutgoingBatch> getBatchesForChannel(String channelId) {
List<OutgoingBatch> batchList = new ArrayList<OutgoingBatch>();
if (channelId != null) {
for (OutgoingBatch batch : batches) {
if (channelId.equals(batch.getChannelId())) {
batchList.add(batch);
}
}
}
return batchList;
}

public void setChannels(Set<Channel> channels) {
this.channels = channels;
public List<OutgoingBatch> getBatchesForChannels(Set<String> channelIds) {
List<OutgoingBatch> batchList = new ArrayList<OutgoingBatch>();
if (channelIds != null) {
for (OutgoingBatch batch : batches) {
if (channelIds.contains(batch.getChannelId())) {
batchList.add(batch);
}
}
}
return batchList;
}

public List<OutgoingBatch> getBatchesForChannelWindows(Node targetNode, NodeChannel channel,
List<NodeGroupChannelWindow> windows) {
List<OutgoingBatch> keeping = new ArrayList<OutgoingBatch>();

if (batches != null && batches.size() > 0) {

if (channel.isEnabled() && inTimeWindow(windows, targetNode.getTimezoneOffset())) {
int max = channel.getMaxBatchToSend();
int count = 0;
for (OutgoingBatch outgoingBatch : batches) {
if (channel.getId().equals(outgoingBatch.getChannelId()) && count < max) {
keeping.add(outgoingBatch);
count++;
}
}
}
}
return keeping;
}

/**
* If {@link NodeGroupChannelWindow}s are defined for this channel, then
* check to see if the time (according to the offset passed in) is within on
* of the configured windows.
*/
public boolean inTimeWindow(List<NodeGroupChannelWindow> windows, String timezoneOffset) {
if (windows != null && windows.size() > 0) {
for (NodeGroupChannelWindow window : windows) {
if (window.inTimeWindow(timezoneOffset)) {
return true;
}
}
return false;
} else {
return true;
}

}

/**
* Removes all batches that are not associated with an 'activeChannel'.
*
* @return List of batches that were filtered
*/

public List<OutgoingBatch> filterBatchesForInactiveChannels() {
List<OutgoingBatch> filtered = new ArrayList<OutgoingBatch>();

for (OutgoingBatch batch : batches) {
if (!activeChannelIds.contains(batch.getChannelId())) {
filtered.add(batch);
}
}

batches.removeAll(filtered);
return filtered;
}

}
Expand Up @@ -21,6 +21,8 @@

package org.jumpmind.symmetric.service;

import java.util.List;

import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatches;
import org.springframework.jdbc.core.JdbcTemplate;
Expand All @@ -29,8 +31,8 @@ public interface IOutgoingBatchService {

public void markAllAsSentForNode(String nodeId);

public OutgoingBatch findOutgoingBatch(long batchId);
public OutgoingBatch findOutgoingBatch(long batchId);

public OutgoingBatches getOutgoingBatches(String nodeId);

public OutgoingBatches getOutgoingBatchRange(String startBatchId, String endBatchId);
Expand All @@ -43,6 +45,8 @@ public interface IOutgoingBatchService {

public void updateOutgoingBatch(OutgoingBatch batch);

public void updateOutgoingBatches(List<OutgoingBatch> batches);

public void updateOutgoingBatch(JdbcTemplate jdbcTemplate, OutgoingBatch batch);

public void insertOutgoingBatch(OutgoingBatch outgoingBatch);
Expand Down
Expand Up @@ -149,6 +149,8 @@ public Object mapRow(java.sql.ResultSet rs, int arg1) throws SQLException {
nodeChannel.setMaxBatchToSend(rs.getInt(8));
nodeChannel.setBatchAlgorithm(rs.getString(9));
nodeChannel.setLastExtractedTime(rs.getTimestamp(10));
nodeChannel.setExtractPeriodMillis(rs.getLong(11));

return nodeChannel;
};
}));
Expand Down
Expand Up @@ -30,6 +30,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -299,31 +300,21 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws IOE
OutgoingBatches batches = outgoingBatchService.getOutgoingBatches(node.getNodeId());
if (batches != null && batches.getBatches() != null && batches.getBatches().size() > 0) {

ChannelMap suspendIgnoreChannels = targetTransport.getSuspendIgnoreChannelLists(this.configurationService);

Set<String> suspendedChannels = suspendIgnoreChannels.getSuspendChannels();
Set<String> ignoredChannels = suspendIgnoreChannels.getIgnoreChannels();
ChannelMap suspendIgnoreChannels = targetTransport.getSuspendIgnoreChannelLists(configurationService);

// We now have either our local suspend/ignore list, or the combined
// remote send/ignore list and our local list (along with a
// reservation, if we go this far...)

// Now, we need to skip the suspended channels and ignore the
// ignored ones by setting the status to ignored and updating them.

List<OutgoingBatch> suspendBatches = new ArrayList<OutgoingBatch>();
List<OutgoingBatch> ignoredBatches = new ArrayList<OutgoingBatch>();

// Search for suspended or ignores, removing both but keeping track
// of ignores for further updates.
for (OutgoingBatch batch : batches.getBatches()) {
if (ignoredChannels.contains(batch.getChannelId())) {
ignoredBatches.add(batch);
} else if (suspendedChannels.contains(batch.getChannelId())) {
suspendBatches.add(batch);
}
}
batches.getBatches().removeAll(ignoredBatches);
// ignored ones by ultimately setting the status to ignored and
// updating them.

List<OutgoingBatch> suspendBatches = batches.filterBatchesForChannels(suspendIgnoreChannels
.getSuspendChannels());

List<OutgoingBatch> ignoredBatches = batches.filterBatchesForChannels(suspendIgnoreChannels
.getIgnoreChannels());

FileOutgoingTransport fileTransport = null;

Expand All @@ -344,8 +335,19 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws IOE
// will be skipped in the future.
for (OutgoingBatch batch : ignoredBatches) {
batch.setStatus(OutgoingBatch.Status.IG);
outgoingBatchService.updateOutgoingBatch(batch);

}
outgoingBatchService.updateOutgoingBatches(ignoredBatches);

// Next, we update the node channel controls to the current
// timestamp
Calendar now = Calendar.getInstance();

for (NodeChannel nodeChannel : batches.getActiveChannels()) {
nodeChannel.setLastExtractedTime(now.getTime());
configurationService.saveNodeChannelControl(nodeChannel, false);
}

} finally {
if (fileTransport != null) {
fileTransport.close();
Expand Down

0 comments on commit 6f4327f

Please sign in to comment.