Skip to content

Commit

Permalink
Added a feature for a client. Allow for the definition of time window…
Browse files Browse the repository at this point in the history
…s where synchronization is limited by channel and node_group_id
  • Loading branch information
chenson42 committed Aug 14, 2009
1 parent 7073c09 commit bafd614
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 35 deletions.
Expand Up @@ -20,6 +20,8 @@

package org.jumpmind.symmetric.model;

import java.util.List;

public class NodeChannel extends Channel {

private static final long serialVersionUID = -2493052366767513160L;
Expand All @@ -30,13 +32,15 @@ public class NodeChannel extends Channel {

private boolean suspended = false;

private List<NodeGroupChannelWindow> nodeGroupChannelWindows;

public NodeChannel() {
}

public NodeChannel(String channelId) {
this.setId(channelId);
}

public boolean isIgnored() {
return ignored;
}
Expand All @@ -60,4 +64,31 @@ public String getNodeId() {
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}

public void setNodeGroupChannelWindows(List<NodeGroupChannelWindow> nodeGroupChannelWindows) {
this.nodeGroupChannelWindows = nodeGroupChannelWindows;
}

public List<NodeGroupChannelWindow> getNodeGroupChannelWindows() {
return nodeGroupChannelWindows;
}

/**
* If {@link NodeGroupChannelWindow}s are defined for this channel, then check to
* see if the time (according to the offset passed in) is within on of the configured windows.
*/
public boolean inTimeWindow(String timezoneOffset) {
if (nodeGroupChannelWindows != null && nodeGroupChannelWindows.size() > 0) {
for (NodeGroupChannelWindow window : nodeGroupChannelWindows) {
if (window.inTimeWindow(timezoneOffset)) {
return true;
}
}
return false;
} else {
return true;
}

}

}
@@ -0,0 +1,96 @@
/*
* SymmetricDS is an open source database synchronization solution.
*
* Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*/
package org.jumpmind.symmetric.model;

import java.sql.Time;
import java.util.Date;

import org.apache.commons.lang.time.FastDateFormat;
import org.jumpmind.symmetric.util.AppUtils;

public class NodeGroupChannelWindow {

private String nodeGroupId;
private String channelId;
private Time startTime;
private Time endTime;
private boolean enabled;
final FastDateFormat HHmmss = FastDateFormat.getInstance("HH:mm:ss");

public String getNodeGroupId() {
return nodeGroupId;
}

public void setNodeGroupId(String nodeGroupId) {
this.nodeGroupId = nodeGroupId;
}

public String getChannelId() {
return channelId;
}

public void setChannelId(String channelId) {
this.channelId = channelId;
}

public Time getStartTime() {
return startTime;
}

public void setStartTime(Time startTime) {
this.startTime = startTime;
}

public Time getEndTime() {
return endTime;
}

public void setEndTime(Time endTime) {
this.endTime = endTime;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public boolean isEnabled() {
return enabled;
}

public boolean inTimeWindow(Date date) {
Time time = Time.valueOf(HHmmss.format(date));
return inTimeWindow(time);
}

public boolean inTimeWindow(Time time) {
if (enabled) {
return (startTime.before(time) && endTime.after(time))
|| (startTime.before(time) && endTime.before(startTime))
|| (endTime.after(time) && startTime.after(endTime));
} else {
return true;
}
}

public boolean inTimeWindow(String timezoneOffset) {
return inTimeWindow(AppUtils.getLocalDateForOffset(timezoneOffset));
}

}
Expand Up @@ -29,6 +29,7 @@
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.DataEventAction;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.NodeGroupChannelWindow;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.springframework.jdbc.core.RowMapper;
Expand Down Expand Up @@ -84,22 +85,30 @@ public NodeChannel getChannel(String channelId) {
@SuppressWarnings("unchecked")
public List<NodeChannel> getChannels() {
if (System.currentTimeMillis() - channelCacheTime >= MAX_CHANNEL_CACHE_TIME || channelCache == null) {
channelCache = jdbcTemplate.query(getSql("selectChannelsSql"), new Object[] {}, new RowMapper() {
public Object mapRow(java.sql.ResultSet rs, int arg1) throws java.sql.SQLException {
NodeChannel channel = new NodeChannel();
channel.setId(rs.getString(1));
channel.setNodeId(rs.getString(2));
channel.setIgnored(isSet(rs.getObject(3)));
channel.setSuspended(isSet(rs.getObject(4)));
channel.setProcessingOrder(rs.getInt(5));
channel.setMaxBatchSize(rs.getInt(6));
channel.setEnabled(rs.getBoolean(7));
channel.setMaxBatchToSend(rs.getInt(8));
channel.setBatchAlgorithm(rs.getString(9));
return channel;
};
});
channelCacheTime = System.currentTimeMillis();
synchronized (this) {
if (System.currentTimeMillis() - channelCacheTime >= MAX_CHANNEL_CACHE_TIME || channelCache == null) {
channelCache = jdbcTemplate.query(getSql("selectChannelsSql"), new Object[] {}, new RowMapper() {
public Object mapRow(java.sql.ResultSet rs, int arg1) throws java.sql.SQLException {
NodeChannel channel = new NodeChannel();
channel.setId(rs.getString(1));
channel.setNodeId(rs.getString(2));
channel.setIgnored(isSet(rs.getObject(3)));
channel.setSuspended(isSet(rs.getObject(4)));
channel.setProcessingOrder(rs.getInt(5));
channel.setMaxBatchSize(rs.getInt(6));
channel.setEnabled(rs.getBoolean(7));
channel.setMaxBatchToSend(rs.getInt(8));
channel.setBatchAlgorithm(rs.getString(9));
return channel;
};
});

for (NodeChannel channel : channelCache) {
getNodeGroupChannelWindows(parameterService.getNodeGroupId(), channel.getId());
}
channelCacheTime = System.currentTimeMillis();
}
}
}
return channelCache;
}
Expand Down Expand Up @@ -148,6 +157,24 @@ public void autoConfigDatabase(boolean force) {
}
}

@SuppressWarnings("unchecked")
protected List<NodeGroupChannelWindow> getNodeGroupChannelWindows(String nodeGroupId, String channelId) {
return (List<NodeGroupChannelWindow>) jdbcTemplate.query(getSql("selectNodeGroupChannelWindowSql"),
new Object[] { nodeGroupId, channelId }, new NodeGroupChannelWindowMapper());
}

class NodeGroupChannelWindowMapper implements RowMapper {
public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
NodeGroupChannelWindow window = new NodeGroupChannelWindow();
window.setNodeGroupId(rs.getString(1));
window.setChannelId(rs.getString(2));
window.setStartTime(rs.getTime(3));
window.setEndTime(rs.getTime(4));
window.setEnabled(rs.getBoolean(5));
return window;
}
}

class NodeGroupLinkMapper implements RowMapper {
public Object mapRow(ResultSet rs, int num) throws SQLException {
NodeGroupLink node_groupTarget = new NodeGroupLink();
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.commons.lang.StringUtils;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.db.SequenceIdentifier;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.OutgoingBatch;
Expand Down Expand Up @@ -124,17 +125,17 @@ public OutgoingBatch findOutgoingBatch(long batchId) {
* as the batches will have already been created by {@link #buildOutgoingBatches(String)} in channel priority order.
*/
@SuppressWarnings("unchecked")
public List<OutgoingBatch> getOutgoingBatches(String nodeId) {
public List<OutgoingBatch> getOutgoingBatches(String targetNodeId) {
List<OutgoingBatch> list = (List<OutgoingBatch>) jdbcTemplate.query(getSql("selectOutgoingBatchSql"),
new Object[] { nodeId, OutgoingBatch.Status.NE.toString(), OutgoingBatch.Status.SE.toString(),
new Object[] { targetNodeId, OutgoingBatch.Status.NE.toString(), OutgoingBatch.Status.SE.toString(),
OutgoingBatch.Status.ER.toString() }, new OutgoingBatchMapper());
final HashSet<String> errorChannels = new HashSet<String>();
for (OutgoingBatch batch : list) {
if (batch.getStatus().equals(OutgoingBatch.Status.ER)) {
errorChannels.add(batch.getChannelId());
}
}

List<NodeChannel> channels = configurationService.getChannels();
Collections.sort(channels, new Comparator<NodeChannel>() {
public int compare(NodeChannel b1, NodeChannel b2) {
Expand All @@ -150,18 +151,19 @@ public int compare(NodeChannel b1, NodeChannel b2) {
}
});

return filterMaxNumberOfOutgoingBatchesByChannel(list, channels);
return filterOutgoingBatchesForChannels(targetNodeId, list, channels);
}

/**
* Filter out the maximum number of batches to send.
*/
private List<OutgoingBatch> filterMaxNumberOfOutgoingBatchesByChannel(List<OutgoingBatch> batches,
protected List<OutgoingBatch> filterOutgoingBatchesForChannels(String targetNodeId, List<OutgoingBatch> batches,
List<NodeChannel> channels) {
if (batches != null && batches.size() > 0) {
Node node = nodeService.findNode(targetNodeId);
List<OutgoingBatch> filtered = new ArrayList<OutgoingBatch>(batches.size());
for (NodeChannel channel : channels) {
if (channel.isEnabled()) {
if (channel.isEnabled() && channel.inTimeWindow(node.getTimezoneOffset())) {
int max = channel.getMaxBatchToSend();
int count = 0;
for (OutgoingBatch outgoingBatch : batches) {
Expand Down
19 changes: 19 additions & 0 deletions symmetric/src/main/java/org/jumpmind/symmetric/util/AppUtils.java
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.util.Date;
import java.util.TimeZone;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.FastDateFormat;
Expand Down Expand Up @@ -94,6 +95,9 @@ public static <T> T find(String name, SymmetricEngine engine) {
return (T) engine.getApplicationContext().getBean(name);
}

/**
* @see #find(String, SymmetricEngine)
*/
@SuppressWarnings("unchecked")
public static <T> T find(String name, SymmetricWebServer server) {
return (T) server.getEngine().getApplicationContext().getBean(name);
Expand All @@ -106,6 +110,21 @@ public static File createTempFile(String token) throws IOException {
return File.createTempFile("sym." + token + ".", ".tmp");
}

/**
* @param timezoneOffset see description for {@link #getTimezoneOffset()}
* @return a date object that represents the local date and time at the passed in offset
*/
public static Date getLocalDateForOffset(String timezoneOffset) {
long currentTime = System.currentTimeMillis();
int myOffset = TimeZone.getDefault().getOffset(currentTime);
int theirOffset = TimeZone.getTimeZone("GMT"+timezoneOffset).getOffset(currentTime);
return new Date(currentTime - myOffset + theirOffset);
}

/**
* Useful method to sleep that catches and ignores the {@link InterruptedException}
* @param ms milliseconds to sleep
*/
public static void sleep(long ms) {
try {
Thread.sleep(ms);
Expand Down
11 changes: 5 additions & 6 deletions symmetric/src/main/resources/ddl-config.xml
Expand Up @@ -115,19 +115,18 @@
<table name="node_channel_ctl">
<column name="node_id" type="VARCHAR" size="50" required="true" primaryKey="true" />
<column name="channel_id" type="VARCHAR" size="50" required="true" primaryKey="true" />
<column name="last_sync_time" type="TIMESTAMP" />
<column name="last_extract_time" type="TIMESTAMP" />
<column name="suspend_enabled" type="BOOLEANINT" size="1" default="0" />
<column name="ignore_enabled" type="BOOLEANINT" size="1" default="0" />
</table>

<table name="node_group_channel_window">
<column name="node_group_id" type="VARCHAR" size="50" required="true" primaryKey="true" />
<column name="channel_id" type="VARCHAR" size="50" required="true" primaryKey="true" />
<column name="start_hour" type="INTEGER" required="true" />
<column name="start_min" type="INTEGER" required="true" />
<column name="end_hour" type="INTEGER" required="true" />
<column name="end_min" type="INTEGER" required="true" />
</table>
<column name="start_time" type="TIME" required="true" primaryKey="true" />
<column name="end_time" type="TIME" required="true" primaryKey="true" />
<column name="enabled" type="BOOLEANINT" size="1" required="true" default="0" />
</table>

<table name="node_security">
<column name="node_id" type="VARCHAR" size="50" required="true" primaryKey="true" />
Expand Down
Expand Up @@ -50,6 +50,12 @@
<entry key="deleteChannelSql">
<value>delete from $[sym.sync.table.prefix]_channel where channel_id=?</value>
</entry>
<entry key="selectNodeGroupChannelWindowSql">
<value>
select node_group_id, channel_id, start_time, end_time, enabled
from $[sym.sync.table.prefix]_node_group_channel_window where node_group_id=? and channel_id=?
</value>
</entry>
</util:map>

</beans>
Expand Up @@ -17,10 +17,13 @@
</entry>
<entry key="$[sym.sync.table.prefix]_parameter">
<value>1=1</value>
</entry>
</entry>
<entry key="$[sym.sync.table.prefix]_channel">
<value>1=1</value>
</entry>
<entry key="$[sym.sync.table.prefix]_node_group_channel_window">
<value>1=1</value>
</entry>
<entry key="$[sym.sync.table.prefix]_trigger">
<value>1=1</value>
</entry>
Expand Down
4 changes: 0 additions & 4 deletions symmetric/src/main/resources/symmetric-default.properties
Expand Up @@ -169,10 +169,6 @@ http.concurrent.reservation.timeout.ms=20000
# cert should be accepted (see javax.net.ssl.HostnameVerifier)
https.verified.server.names=

# @deprecated - this is now on the channel
# This is the maximum number of outgoing batches that will processed in one pull or push request.
# outgoing.batches.max.to.process=60

# Outgoing batches are assigned to data events using a multiple row update by default.
# If you experience timeouts during batching, you can set this to false, which will
# update data events one at a time.
Expand Down

0 comments on commit bafd614

Please sign in to comment.