Skip to content

Commit

Permalink
SYMMETRICDS-240 - Made method multi-thread enabled. Changed the API t…
Browse files Browse the repository at this point in the history
…o be explicit in the request to select extract period. There could be clients who don't want the extract database hit if the channels are already cached.
  • Loading branch information
chenson42 committed Mar 25, 2010
1 parent 6c29140 commit f7d6027
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 122 deletions.
Expand Up @@ -55,13 +55,13 @@ public interface IConfigurationService {

public NodeGroupLinkAction getDataEventActionsByGroupId(String sourceGroupId, String targetGroupId);

public List<NodeChannel> getNodeChannels();
public List<NodeChannel> getNodeChannels(boolean refreshExtractMillis);

public List<NodeChannel> getNodeChannels(String nodeId);
public List<NodeChannel> getNodeChannels(String nodeId, boolean refreshExtractMillis);

public NodeChannel getNodeChannel(String channelId);
public NodeChannel getNodeChannel(String channelId, boolean refreshExtractMillis);

public NodeChannel getNodeChannel(String channelId, String nodeId);
public NodeChannel getNodeChannel(String channelId, String nodeId, boolean refreshExtractMillis);

public void reloadChannels();

Expand Down
Expand Up @@ -134,12 +134,12 @@ public void deleteChannel(Channel channel) {
jdbcTemplate.update(getSql("deleteChannelSql"), new Object[] { channel.getChannelId() });
}

public NodeChannel getNodeChannel(String channelId) {
return getNodeChannel(channelId, nodeService.findIdentityNodeId());
public NodeChannel getNodeChannel(String channelId, boolean refreshExtractMillis) {
return getNodeChannel(channelId, nodeService.findIdentityNodeId(), refreshExtractMillis);
}

public NodeChannel getNodeChannel(String channelId, String nodeId) {
List<NodeChannel> channels = getNodeChannels(nodeId);
public NodeChannel getNodeChannel(String channelId, String nodeId, boolean refreshExtractMillis) {
List<NodeChannel> channels = getNodeChannels(nodeId, refreshExtractMillis);
for (NodeChannel nodeChannel : channels) {
if (nodeChannel.getChannelId().equals(channelId)) {
return nodeChannel;
Expand All @@ -148,17 +148,18 @@ public NodeChannel getNodeChannel(String channelId, String nodeId) {
return null;
}

public List<NodeChannel> getNodeChannels() {
return getNodeChannels(nodeService.findIdentityNodeId());
public List<NodeChannel> getNodeChannels(boolean refreshExtractMillis) {
return getNodeChannels(nodeService.findIdentityNodeId(), refreshExtractMillis);
}

@SuppressWarnings("unchecked")
public List<NodeChannel> getNodeChannels(final String nodeId) {
public List<NodeChannel> getNodeChannels(final String nodeId, boolean refreshExtractMillis) {
boolean loaded = false;
long channelCacheTimeoutInMs = parameterService
.getLong(ParameterConstants.CACHE_TIMEOUT_CHANNEL_IN_MS);
List<NodeChannel> nodeChannels = nodeChannelCache != null ? nodeChannelCache.get(nodeId) : null;
if (System.currentTimeMillis() - nodeChannelCacheTime >= channelCacheTimeoutInMs
|| nodeChannelCache == null || nodeChannelCache.get(nodeId) == null) {
|| nodeChannels == null) {
synchronized (this) {
if (System.currentTimeMillis() - nodeChannelCacheTime >= channelCacheTimeoutInMs
|| nodeChannelCache == null || nodeChannelCache.get(nodeId) == null) {
Expand All @@ -167,64 +168,64 @@ public List<NodeChannel> getNodeChannels(final String nodeId) {
nodeChannelCache = new HashMap<String, List<NodeChannel>>();
nodeChannelCacheTime = System.currentTimeMillis();
}
nodeChannelCache.put(nodeId, jdbcTemplate.query(getSql("selectChannelsSql"),
nodeChannels = jdbcTemplate.query(getSql("selectChannelsSql"),
new Object[] { nodeId }, new RowMapper() {
public Object mapRow(java.sql.ResultSet rs, int arg1)
throws SQLException {
NodeChannel nodeChannel = new NodeChannel();
nodeChannel.setChannelId(rs.getString(1));
// note that 2 is intentionally missing
// here.
nodeChannel.setNodeId(nodeId);
nodeChannel.setIgnoreEnabled(isSet(rs.getObject(3)));
nodeChannel.setSuspendEnabled(isSet(rs.getObject(4)));
nodeChannel.setProcessingOrder(rs.getInt(5));
nodeChannel.setMaxBatchSize(rs.getInt(6));
nodeChannel.setEnabled(rs.getBoolean(7));
nodeChannel.setMaxBatchToSend(rs.getInt(8));
nodeChannel.setMaxDataToRoute(rs.getInt(9));
nodeChannel.setUseOldDataToRoute(rs.getBoolean(10));
nodeChannel.setUseRowDataToRoute(rs.getBoolean(11));
nodeChannel.setBatchAlgorithm(rs.getString(12));
nodeChannel.setLastExtractedTime(rs.getTimestamp(13));
nodeChannel.setExtractPeriodMillis(rs.getLong(14));

return nodeChannel;
};
}));
public Object mapRow(java.sql.ResultSet rs, int arg1)
throws SQLException {
NodeChannel nodeChannel = new NodeChannel();
nodeChannel.setChannelId(rs.getString(1));
// note that 2 is intentionally missing
// here.
nodeChannel.setNodeId(nodeId);
nodeChannel.setIgnoreEnabled(isSet(rs.getObject(3)));
nodeChannel.setSuspendEnabled(isSet(rs.getObject(4)));
nodeChannel.setProcessingOrder(rs.getInt(5));
nodeChannel.setMaxBatchSize(rs.getInt(6));
nodeChannel.setEnabled(rs.getBoolean(7));
nodeChannel.setMaxBatchToSend(rs.getInt(8));
nodeChannel.setMaxDataToRoute(rs.getInt(9));
nodeChannel.setUseOldDataToRoute(rs.getBoolean(10));
nodeChannel.setUseRowDataToRoute(rs.getBoolean(11));
nodeChannel.setBatchAlgorithm(rs.getString(12));
nodeChannel.setLastExtractedTime(rs.getTimestamp(13));
nodeChannel.setExtractPeriodMillis(rs.getLong(14));

return nodeChannel;
};
});
nodeChannelCache.put(nodeId, nodeChannels);
loaded = true;
}
}
}

if (!loaded) {

// need to read last extracted time from database regardless of
// whether we used the cache or not.
// locate the nodes in the cache, and update it.
if (!loaded && refreshExtractMillis) {
// need to read last extracted time from database regardless of
// whether we used the cache or not.
// locate the nodes in the cache, and update it.
final Map<String, NodeChannel> nodeChannelsMap = new HashMap<String, NodeChannel>();

List<NodeChannel> nodeChannels = nodeChannelCache.get(nodeId);
final Map<String, NodeChannel> nodeChannelsMap = new HashMap<String, NodeChannel>();
for (NodeChannel nc : nodeChannels) {
nodeChannelsMap.put(nc.getChannelId(), nc);
}

for (NodeChannel nc : nodeChannels) {
nodeChannelsMap.put(nc.getChannelId(), nc);
}
jdbcTemplate.query(getSql("selectNodeChannelControlLastExtractTimeSql"),
new Object[] { nodeId }, new ResultSetExtractor() {
public Object extractData(ResultSet rs) throws SQLException,
DataAccessException {
if (rs.next()) {
String channelId = rs.getString(1);
Date extractTime = rs.getTimestamp(2);
nodeChannelsMap.get(channelId)
.setLastExtractedTime(extractTime);
}
return null;
};
});

jdbcTemplate.query(getSql("selectNodeChannelControlLastExtractTimeSql"),
new Object[] { nodeId }, new ResultSetExtractor() {
public Object extractData(ResultSet rs) throws SQLException,
DataAccessException {
if (rs.next()) {
String channelId = rs.getString(1);
Date extractTime = rs.getTimestamp(2);
nodeChannelsMap.get(channelId).setLastExtractedTime(extractTime);
}
return null;
};
});
}

return nodeChannelCache.get(nodeId);
return nodeChannels;
}

public void reloadChannels() {
Expand Down Expand Up @@ -265,7 +266,7 @@ public void autoConfigDatabase(boolean force) {
protected void autoConfigChannels() {
if (defaultChannels != null) {
reloadChannels();
List<NodeChannel> channels = getNodeChannels();
List<NodeChannel> channels = getNodeChannels(false);
for (Channel defaultChannel : defaultChannels) {
if (!defaultChannel.isInList(channels)) {
log.info("ChannelAutoConfiguring", defaultChannel.getChannelId());
Expand Down Expand Up @@ -385,14 +386,16 @@ public List<NodeGroupChannelWindow> getNodeGroupChannelWindows(String nodeGroupI

public ChannelMap getSuspendIgnoreChannelLists(final String nodeId) {
ChannelMap map = new ChannelMap();
List<NodeChannel> ncs = getNodeChannels(nodeId);
List<NodeChannel> ncs = getNodeChannels(nodeId, true);

for (NodeChannel nc : ncs) {
if (nc.isSuspendEnabled()) {
map.addSuspendChannels(nc.getChannelId());
}
if (nc.isIgnoreEnabled()) {
map.addIgnoreChannels(nc.getChannelId());
if (ncs != null) {
for (NodeChannel nc : ncs) {
if (nc.isSuspendEnabled()) {
map.addSuspendChannels(nc.getChannelId());
}
if (nc.isIgnoreEnabled()) {
map.addIgnoreChannels(nc.getChannelId());
}
}
}
return map;
Expand Down
Expand Up @@ -261,7 +261,7 @@ public Object doInConnection(Connection conn) throws SQLException, DataAccessExc
try {
OutgoingBatch batch = ctx.getBatch();
Table table = dbDialect.getTable(triggerRouter.getTrigger(), true);
NodeChannel channel = batch != null ? configurationService.getNodeChannel(batch.getChannelId())
NodeChannel channel = batch != null ? configurationService.getNodeChannel(batch.getChannelId(), false)
: new NodeChannel(Constants.CHANNEL_RELOAD);
Set<Node> oneNodeSet = new HashSet<Node>();
oneNodeSet.add(node);
Expand Down
Expand Up @@ -143,7 +143,7 @@ public OutgoingBatches getOutgoingBatches(Node node) {

OutgoingBatches batches = new OutgoingBatches(list);

List<NodeChannel> channels = configurationService.getNodeChannels(node.getNodeId());
List<NodeChannel> channels = configurationService.getNodeChannels(node.getNodeId(), true);
batches.sortChannels(channels);

List<OutgoingBatch> keepers = new ArrayList<OutgoingBatch>();
Expand Down
Expand Up @@ -129,7 +129,7 @@ synchronized public void routeData() {
* reason is to reduce the number of connections we are required to have.
*/
protected int routeDataForEachChannel(DataRef ref, Node sourceNode) {
final List<NodeChannel> channels = configurationService.getNodeChannels();
final List<NodeChannel> channels = configurationService.getNodeChannels(false);
int dataCount = 0;
for (NodeChannel nodeChannel : channels) {
if (!nodeChannel.isSuspendEnabled()) {
Expand Down
Expand Up @@ -49,7 +49,7 @@ public void setUp() {

@Test
public void testGetNodeChannels() throws Exception {
List<NodeChannel> nodeChannels = configurationService.getNodeChannels();
List<NodeChannel> nodeChannels = configurationService.getNodeChannels(false);
Assert.assertNotNull(nodeChannels);
Assert.assertEquals(4, nodeChannels.size());
for (NodeChannel nc : nodeChannels) {
Expand All @@ -64,7 +64,7 @@ public void testGetNodeChannels() throws Exception {
@Test
public void testGetNodeChannelsById() throws Exception {
String nodeId = "12345";
List<NodeChannel> nodeChannels = configurationService.getNodeChannels(nodeId);
List<NodeChannel> nodeChannels = configurationService.getNodeChannels(nodeId, false);
Assert.assertNotNull(nodeChannels);

Assert.assertEquals(4, nodeChannels.size());
Expand All @@ -81,15 +81,15 @@ public void testGetNodeChannelsById() throws Exception {
// Test "ignored"
nc.setIgnoreEnabled(true);
configurationService.saveNodeChannelControl(nc, false);
NodeChannel compareTo = configurationService.getNodeChannel(updatedChannelId, nodeId);
NodeChannel compareTo = configurationService.getNodeChannel(updatedChannelId, nodeId, false);
Assert.assertTrue(compareTo.isIgnoreEnabled());
Assert.assertFalse(compareTo.isSuspendEnabled());
Assert.assertNull(compareTo.getLastExtractedTime());

// Test "suspended"
compareTo.setSuspendEnabled(true);
configurationService.saveNodeChannelControl(compareTo, false);
compareTo = configurationService.getNodeChannel(updatedChannelId, nodeId);
compareTo = configurationService.getNodeChannel(updatedChannelId, nodeId, false);
Assert.assertTrue(compareTo.isIgnoreEnabled());
Assert.assertTrue(compareTo.isSuspendEnabled());
Assert.assertNull(compareTo.getLastExtractedTime());
Expand All @@ -102,14 +102,14 @@ public void testGetNodeChannelsById() throws Exception {
nc1.setLastExtractedTime(date);
configurationService.saveNodeChannelControl(nc1, false);

compareTo = configurationService.getNodeChannel(updatedChannelId1, nodeId);
compareTo = configurationService.getNodeChannel(updatedChannelId1, nodeId, false);
Assert.assertFalse(compareTo.isIgnoreEnabled());
Assert.assertFalse(compareTo.isSuspendEnabled());
Assert.assertNotNull(compareTo.getLastExtractedTime());
Assert.assertEquals(date.getTime(), compareTo.getLastExtractedTime().getTime());

// make sure other nodeChannel not effected
compareTo = configurationService.getNodeChannel(updatedChannelId, nodeId);
compareTo = configurationService.getNodeChannel(updatedChannelId, nodeId, false);
Assert.assertTrue(compareTo.isIgnoreEnabled());
Assert.assertTrue(compareTo.isSuspendEnabled());
Assert.assertNull(compareTo.getLastExtractedTime());
Expand All @@ -126,7 +126,7 @@ public void testGetSuspendIgnoreChannels() throws Exception {

ConfigurationService configurationService = (ConfigurationService) find(Constants.CONFIG_SERVICE);

List<NodeChannel> ncs = configurationService.getNodeChannels(nodeId);
List<NodeChannel> ncs = configurationService.getNodeChannels(nodeId, false);

NodeChannel nc = ncs.get(1);

Expand Down

0 comments on commit f7d6027

Please sign in to comment.