Skip to content

Commit

Permalink
Modified Configuration Service's channel cache.
Browse files Browse the repository at this point in the history
  • Loading branch information
mhanes committed Sep 8, 2009
1 parent b99e718 commit 81f4c2a
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 70 deletions.
Expand Up @@ -23,7 +23,9 @@

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.Channel;
Expand All @@ -32,13 +34,16 @@
import org.jumpmind.symmetric.model.NodeGroupChannelWindow;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.INodeService;
import org.springframework.jdbc.core.RowMapper;

public class ConfigurationService extends AbstractService implements IConfigurationService {

private static final long MAX_CHANNEL_CACHE_TIME = 60000;

private static List<NodeChannel> channelCache;
private INodeService nodeService;

private static Map<String, List<NodeChannel>> channelCache;

private static long channelCacheTime;

Expand Down Expand Up @@ -82,66 +87,48 @@ public NodeChannel getChannel(String channelId) {
return null;
}

@SuppressWarnings("unchecked")
public List<NodeChannel> getChannels() {
if (System.currentTimeMillis() - channelCacheTime >= MAX_CHANNEL_CACHE_TIME || channelCache == null) {
synchronized (this) {
if (System.currentTimeMillis() - channelCacheTime >= MAX_CHANNEL_CACHE_TIME || channelCache == null) {
channelCache = jdbcTemplate.query(getSql("selectChannelsSql"), new Object[] {}, new RowMapper() {
public Object mapRow(java.sql.ResultSet rs, int arg1) throws java.sql.SQLException {
NodeChannel channel = new NodeChannel();
channel.setId(rs.getString(1));
channel.setNodeId(rs.getString(2));
channel.setIgnored(isSet(rs.getObject(3)));
channel.setSuspended(isSet(rs.getObject(4)));
channel.setProcessingOrder(rs.getInt(5));
channel.setMaxBatchSize(rs.getInt(6));
channel.setEnabled(rs.getBoolean(7));
channel.setMaxBatchToSend(rs.getInt(8));
channel.setBatchAlgorithm(rs.getString(9));
return channel;
};
});

for (NodeChannel channel : channelCache) {
getNodeGroupChannelWindows(parameterService.getNodeGroupId(), channel.getId());
}
channelCacheTime = System.currentTimeMillis();
}
}
List<NodeChannel> nodeChannels = getChannels(nodeService.findIdentityNodeId());
for (NodeChannel channel : nodeChannels) {
getNodeGroupChannelWindows(parameterService.getNodeGroupId(), channel.getId());
}
return channelCache;
return nodeChannels;
}

@SuppressWarnings("unchecked")
public List<NodeChannel> getChannels(String nodeId) {
if (System.currentTimeMillis() - channelCacheTime >= MAX_CHANNEL_CACHE_TIME || channelCache == null) {
public List<NodeChannel> getChannels(final String nodeId) {

if (System.currentTimeMillis() - channelCacheTime >= MAX_CHANNEL_CACHE_TIME || channelCache == null
|| channelCache.get(nodeId) == null) {
synchronized (this) {
if (System.currentTimeMillis() - channelCacheTime >= MAX_CHANNEL_CACHE_TIME || channelCache == null) {
channelCache = jdbcTemplate.query(getSql("selectNodeChannelsSql"), new Object[] { nodeId }, new RowMapper() {
public Object mapRow(java.sql.ResultSet rs, int arg1) throws java.sql.SQLException {
NodeChannel channel = new NodeChannel();
channel.setId(rs.getString(1));
channel.setNodeId(rs.getString(2));
channel.setIgnored(isSet(rs.getObject(3)));
channel.setSuspended(isSet(rs.getObject(4)));
channel.setProcessingOrder(rs.getInt(5));
channel.setMaxBatchSize(rs.getInt(6));
channel.setEnabled(rs.getBoolean(7));
channel.setMaxBatchToSend(rs.getInt(8));
channel.setBatchAlgorithm(rs.getString(9));
return channel;
};
});

for (NodeChannel channel : channelCache) {
getNodeGroupChannelWindows(parameterService.getNodeGroupId(), channel.getId());
if (System.currentTimeMillis() - channelCacheTime >= MAX_CHANNEL_CACHE_TIME || channelCache == null
|| channelCache.get(nodeId) == null) {
if (System.currentTimeMillis() - channelCacheTime >= MAX_CHANNEL_CACHE_TIME || channelCache == null) {
channelCache = new HashMap<String, List<NodeChannel>>();
channelCacheTime = System.currentTimeMillis();
}
channelCacheTime = System.currentTimeMillis();
channelCache.put(nodeId, jdbcTemplate.query(getSql("selectChannelsSql"), new Object[] { nodeId },
new RowMapper() {
public Object mapRow(java.sql.ResultSet rs, int arg1) throws java.sql.SQLException {
NodeChannel channel = new NodeChannel();
channel.setId(rs.getString(1));
// note that 2 is intentionally missing
// here.
channel.setNodeId(nodeId);
channel.setIgnored(isSet(rs.getObject(3)));
channel.setSuspended(isSet(rs.getObject(4)));
channel.setProcessingOrder(rs.getInt(5));
channel.setMaxBatchSize(rs.getInt(6));
channel.setEnabled(rs.getBoolean(7));
channel.setMaxBatchToSend(rs.getInt(8));
channel.setBatchAlgorithm(rs.getString(9));
return channel;
};
}));
}
}
}
return channelCache;
return channelCache.get(nodeId);
}

public void reloadChannels() {
Expand Down Expand Up @@ -219,4 +206,8 @@ public void setDefaultChannels(List<Channel> defaultChannels) {
this.defaultChannels = defaultChannels;
}

public void setNodeService(INodeService nodeService) {
this.nodeService = nodeService;
}

}
Expand Up @@ -74,11 +74,11 @@ public void doFilter(final ServletRequest req, final ServletResponse resp, final
if (!concurrentConnectionManager.reserveConnection(nodeId, poolId, ReservationType.SOFT)) {
sendError(resp, HttpServletResponse.SC_SERVICE_UNAVAILABLE);
} else {
// buildSuspendIgnoreResponseHeaders(nodeId, resp);
buildSuspendIgnoreResponseHeaders(nodeId, resp);
}
} else if (concurrentConnectionManager.reserveConnection(nodeId, poolId, ReservationType.HARD)) {
try {
//buildSuspendIgnoreResponseHeaders(nodeId, resp);
buildSuspendIgnoreResponseHeaders(nodeId, resp);
chain.doFilter(req, resp);
} finally {
concurrentConnectionManager.releaseConnection(nodeId, poolId);
Expand All @@ -98,22 +98,22 @@ private void buildSuspendIgnoreResponseHeaders(final String nodeId, final Servle

for (NodeChannel nc : ncs) {
if (nc.isSuspended()) {
suspendChannelsBuffer.append(',').append(nc.getId());
suspendChannelsBuffer.append(nc.getId()).append(',');
}
if (nc.isIgnored()) {
ignoreChannelsBuffer.append(',').append(nc.getId());
ignoreChannelsBuffer.append(nc.getId()).append(',');
}
}

String suspendChannels = StringUtils.trimToNull(suspendChannelsBuffer.toString());
String ignoreChannels = StringUtils.trimToNull(ignoreChannelsBuffer.toString());

if (suspendChannels != null) {
httpResponse.setHeader(WebConstants.SUSPENDED_CHANNELS, suspendChannels.substring(1));
httpResponse.setHeader(WebConstants.SUSPENDED_CHANNELS, StringUtils.strip(suspendChannels, ","));
}

if (ignoreChannels != null) {
httpResponse.setHeader(WebConstants.IGNORED_CHANNELS, ignoreChannels.substring(1));
httpResponse.setHeader(WebConstants.IGNORED_CHANNELS, StringUtils.strip(ignoreChannels, ","));
}

}
Expand Down
Expand Up @@ -27,22 +27,14 @@
</value>
</entry>
<entry key="selectChannelsSql">
<value>
select c.channel_id, n.node_id, nc.ignore_enabled, nc.suspend_enabled, c.processing_order,
c.max_batch_size, c.enabled, c.max_batch_to_send, c.batch_algorithm from $[sym.sync.table.prefix]_channel c inner
join $[sym.sync.table.prefix]_node_identity n on 1 = 1 left outer join
$[sym.sync.table.prefix]_node_channel_ctl nc on c.channel_id = nc.channel_id and n.node_id =
nc.node_id order by c.processing_order asc, c.channel_id
</value>
</entry>
<entry key="selectNodeChannelsSql">
<value>
select c.channel_id, nc.node_id, nc.ignore_enabled, nc.suspend_enabled, c.processing_order,
c.max_batch_size, c.enabled, c.max_batch_to_send, c.batch_algorithm from $[sym.sync.table.prefix]_channel c inner join
$[sym.sync.table.prefix]_node_channel_ctl nc on c.channel_id = nc.channel_id and nc.node_id= ?
order by c.processing_order asc, c.channel_id
c.max_batch_size, c.enabled, c.max_batch_to_send, c.batch_algorithm from $[sym.sync.table.prefix]_channel c left outer join
$[sym.sync.table.prefix]_node_channel_ctl nc on c.channel_id = nc.channel_id and nc.node_id = ?
order by c.processing_order asc, c.channel_id
</value>
</entry>

<entry key="insertChannelSql">
<value>
insert into $[sym.sync.table.prefix]_channel (channel_id, processing_order, max_batch_size,
Expand Down
2 changes: 2 additions & 0 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -55,6 +55,8 @@

<bean id="configurationService" class="org.jumpmind.symmetric.service.impl.ConfigurationService" scope="singleton" parent="abstractService">
<property name="sql" ref="configurationServiceSql" />
<property name="nodeService" ref="nodeService" />

<property name="defaultChannels">
<list>
<bean class="org.jumpmind.symmetric.model.Channel">
Expand Down

0 comments on commit 81f4c2a

Please sign in to comment.