Skip to content

Commit

Permalink
development check-in
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Feb 3, 2012
1 parent f06656c commit c87e9d9
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 113 deletions.
Expand Up @@ -184,13 +184,10 @@ public List<OutgoingBatch> getBatchesForChannelWindows(Node targetNode, NodeChan

if (windows != null) {
if (batches != null && batches.size() > 0) {
if (channel.isEnabled() && inTimeWindow(windows, targetNode.getTimezoneOffset())) {
int max = channel.getMaxBatchToSend();
int count = 0;
if (inTimeWindow(windows, targetNode.getTimezoneOffset())) {
for (OutgoingBatch outgoingBatch : batches) {
if (channel.getChannelId().equals(outgoingBatch.getChannelId()) && count < max) {
if (channel.getChannelId().equals(outgoingBatch.getChannelId())) {
keeping.add(outgoingBatch);
count++;
}
}
}
Expand Down
Expand Up @@ -78,7 +78,9 @@ public interface IConfigurationService {

public NodeChannel getNodeChannel(String channelId, boolean refreshExtractMillis);

public Channel getChannel (String channelId);
public Channel getChannel (String channelId);

public Map<String, Channel> getChannels(boolean refreshCache);

public NodeChannel getNodeChannel(String channelId, String nodeId, boolean refreshExtractMillis);

Expand Down
Expand Up @@ -40,7 +40,7 @@ public interface IOutgoingBatchService {

public OutgoingBatch findOutgoingBatch(long batchId);

public OutgoingBatches getOutgoingBatches(Node node);
public OutgoingBatches getOutgoingBatches(Node node, boolean includeDisabledChannels);

public OutgoingBatches getOutgoingBatchRange(String startBatchId, String endBatchId);

Expand Down
Expand Up @@ -63,6 +63,10 @@ public class ConfigurationService extends AbstractService implements IConfigurat
private INodeService nodeService;

private Map<String, List<NodeChannel>> nodeChannelCache;

private Map<String, Channel> channelsCache;

private long channelCacheTime;

private long nodeChannelCacheTime;

Expand Down Expand Up @@ -244,7 +248,7 @@ public List<NodeChannel> getNodeChannels(final String nodeId, boolean refreshExt
nodeChannelCache = new HashMap<String, List<NodeChannel>>();
nodeChannelCacheTime = System.currentTimeMillis();
}
nodeChannels = sqlTemplate.query(getSql("selectChannelsSql"),
nodeChannels = sqlTemplate.query(getSql("selectNodeChannelsSql"),
new ISqlRowMapper<NodeChannel>() {
public NodeChannel mapRow(Row row) {
NodeChannel nodeChannel = new NodeChannel();
Expand Down Expand Up @@ -309,6 +313,7 @@ public Object mapRow(Row row) {
public void reloadChannels() {
synchronized (this) {
nodeChannelCache = null;
channelsCache = null;
}
}

Expand Down Expand Up @@ -478,6 +483,52 @@ public ChannelMap getSuspendIgnoreChannelLists(final String nodeId) {
}
return map;
}

public Map<String, Channel> getChannels(boolean refreshCache) {
long channelCacheTimeoutInMs = parameterService.getLong(
ParameterConstants.CACHE_TIMEOUT_CHANNEL_IN_MS, 60000);
Map<String, Channel> channels = channelsCache;
if (System.currentTimeMillis() - channelCacheTime >= channelCacheTimeoutInMs
|| channels == null || refreshCache) {
synchronized (this) {
channels = channelsCache;
if (System.currentTimeMillis() - channelCacheTime >= channelCacheTimeoutInMs
|| channels == null || refreshCache) {
channels = new HashMap<String, Channel>();
List<Channel> list = sqlTemplate.query(getSql("selectChannelsSql"), new ISqlRowMapper<Channel> () {
public Channel mapRow(Row row) {
Channel channel = new Channel();
channel.setChannelId(row.getString("channel_id"));
channel.setProcessingOrder(row.getInt("processing_order"));
channel.setMaxBatchSize(row.getInt("max_batch_size"));
channel.setEnabled(row.getBoolean("enabled"));
channel.setMaxBatchToSend(row.getInt("max_batch_to_send"));
channel.setMaxDataToRoute(row.getInt("max_data_to_route"));
channel.setUseOldDataToRoute(row
.getBoolean("use_old_data_to_route"));
channel.setUseRowDataToRoute(row
.getBoolean("use_row_data_to_route"));
channel.setUsePkDataToRoute(row
.getBoolean("use_pk_data_to_route"));
channel.setContainsBigLob(row
.getBoolean("contains_big_lob"));
channel.setBatchAlgorithm(row.getString("batch_algorithm"));
channel.setExtractPeriodMillis(row
.getLong("extract_period_millis"));
return channel;
}
});
for (Channel channel : list) {
channels.put(channel.getChannelId(), channel);
}
channelsCache = channels;
channelCacheTime = System.currentTimeMillis();
}
}
}

return channels;
}

public Channel getChannel(String channelId) {
NodeChannel nodeChannel = getNodeChannel(channelId, false);
Expand Down
Expand Up @@ -42,8 +42,15 @@ public ConfigurationServiceSqlMap(IDatabasePlatform platform,
+ " $(node_group_link) where source_node_group_id = ? ");

putSql("isChannelInUseSql", "select count(*) from $(trigger) where channel_id = ? ");

putSql("selectChannelsSql",
"select c.channel_id, c.processing_order, c.max_batch_size, c.enabled, " +
" c.max_batch_to_send, c.max_data_to_route, c.use_old_data_to_route, " +
" c.use_row_data_to_route, c.use_pk_data_to_route, c.contains_big_lob, " +
" c.batch_algorithm, c.extract_period_millis "
+ " from $(channel) c order by c.processing_order asc, c.channel_id ");

putSql("selectNodeChannelsSql",
""
+ "select c.channel_id, nc.node_id, nc.ignore_enabled, nc.suspend_enabled, c.processing_order, "
+ " c.max_batch_size, c.enabled, c.max_batch_to_send, c.max_data_to_route, c.use_old_data_to_route, c.use_row_data_to_route, c.use_pk_data_to_route, c.contains_big_lob, c.batch_algorithm, nc.last_extract_time, c.extract_period_millis "
Expand Down
Expand Up @@ -270,7 +270,7 @@ public List<OutgoingBatch> extract(Node targetNode, IOutgoingTransport targetTra
routerService.routeData();
}

OutgoingBatches batches = outgoingBatchService.getOutgoingBatches(targetNode);
OutgoingBatches batches = outgoingBatchService.getOutgoingBatches(targetNode, false);

if (batches.containsBatches()) {

Expand Down Expand Up @@ -334,8 +334,8 @@ public void extract(Node targetNode, IOutgoingTransport targetTransport,
currentBatch.getNodeId(), currentBatch.getBatchId());
if (previouslyExtracted != null && previouslyExtracted.exists()) {
log.info(
"We have already extracted batch {}. Using the existing extraction. To force re-extraction, please restart this instance of SymmetricDS.",
currentBatch.getBatchId());
"We have already extracted batch {}. Using the existing extraction: {}",
currentBatch.getBatchId(), previouslyExtracted);
} else {
currentBatch.setStatus(OutgoingBatch.Status.QY);
currentBatch.setExtractCount(currentBatch.getExtractCount() + 1);
Expand Down
Expand Up @@ -24,7 +24,9 @@
import java.sql.Types;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.sql.ISqlRowMapper;
Expand All @@ -37,6 +39,7 @@
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.db.SequenceIdentifier;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.NodeSecurity;
Expand Down Expand Up @@ -74,7 +77,7 @@ public OutgoingBatchService(IParameterService parameterService,
public void markAllAsSentForNode(Node node) {
OutgoingBatches batches = null;
do {
batches = getOutgoingBatches(node);
batches = getOutgoingBatches(node, true);
for (OutgoingBatch outgoingBatch : batches.getBatches()) {
outgoingBatch.setStatus(Status.OK);
outgoingBatch.setErrorFlag(false);
Expand Down Expand Up @@ -152,7 +155,7 @@ public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo
public OutgoingBatch findOutgoingBatch(long batchId) {
List<OutgoingBatch> list = (List<OutgoingBatch>) sqlTemplate.query(
getSql("selectOutgoingBatchPrefixSql", "findOutgoingBatchSql"),
new OutgoingBatchMapper(), new Object[] { batchId }, new int[] { Types.NUMERIC });
new OutgoingBatchMapper(true, false), new Object[] { batchId }, new int[] { Types.NUMERIC });
if (list != null && list.size() > 0) {
return list.get(0);
} else {
Expand Down Expand Up @@ -197,7 +200,7 @@ public List<OutgoingBatch> listOutgoingBatches(List<String> nodeIds, List<String
: "selectOutgoingBatchByChannelAndStatusSql", startAtBatchIdSql,
" order by batch_id desc");

return sqlTemplate.query(sql, maxRowsToRetrieve, new OutgoingBatchMapper(),
return sqlTemplate.query(sql, maxRowsToRetrieve, new OutgoingBatchMapper(true, false),
args.toArray(new Object[args.size()]), null);
} else {
return new ArrayList<OutgoingBatch>(0);
Expand All @@ -223,13 +226,13 @@ protected SqlList toStringList(String replacementToken, List<OutgoingBatch.Statu
* been created by {@link #buildOutgoingBatches(String)} in channel priority
* order.
*/
public OutgoingBatches getOutgoingBatches(Node node) {
public OutgoingBatches getOutgoingBatches(Node node, boolean includeDisabledChannels) {
long ts = System.currentTimeMillis();
final int maxNumberOfBatchesToSelect = parameterService.getInt(
ParameterConstants.OUTGOING_BATCH_MAX_BATCHES_TO_SELECT, 1000);
List<OutgoingBatch> list = (List<OutgoingBatch>) sqlTemplate.query(
getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchSql"),
maxNumberOfBatchesToSelect, new OutgoingBatchMapper(),
maxNumberOfBatchesToSelect, new OutgoingBatchMapper(includeDisabledChannels, true),
new Object[] { node.getNodeId(), OutgoingBatch.Status.NE.name(),
OutgoingBatch.Status.QY.name(), OutgoingBatch.Status.SE.name(),
OutgoingBatch.Status.LD.name(), OutgoingBatch.Status.ER.name() }, null);
Expand Down Expand Up @@ -265,15 +268,15 @@ public OutgoingBatches getOutgoingBatchRange(String startBatchId, String endBatc
OutgoingBatches batches = new OutgoingBatches();
batches.setBatches(sqlTemplate.query(
getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchRangeSql"),
new OutgoingBatchMapper(), Long.parseLong(startBatchId), Long.parseLong(endBatchId)));
new OutgoingBatchMapper(true, false), Long.parseLong(startBatchId), Long.parseLong(endBatchId)));
return batches;
}

public OutgoingBatches getOutgoingBatchErrors(int maxRows) {
OutgoingBatches batches = new OutgoingBatches();
batches.setBatches(sqlTemplate.query(
getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchErrorsSql"), maxRows,
new OutgoingBatchMapper(), null, null));
new OutgoingBatchMapper(true, false), null, null));
return batches;
}

Expand Down Expand Up @@ -341,46 +344,64 @@ public OutgoingBatchSummary mapRow(Row rs) {
}

class OutgoingBatchMapper implements ISqlRowMapper<OutgoingBatch> {
public OutgoingBatch mapRow(Row rs) {
OutgoingBatch batch = new OutgoingBatch();
batch.setNodeId(rs.getString("node_id"));
batch.setChannelId(rs.getString("channel_id"));
batch.setStatus(rs.getString("status"));
batch.setByteCount(rs.getLong("byte_count"));
batch.setExtractCount(rs.getLong("extract_count"));
batch.setSentCount(rs.getLong("sent_count"));
batch.setLoadCount(rs.getLong("load_count"));
batch.setDataEventCount(rs.getLong("data_event_count"));
batch.setReloadEventCount(rs.getLong("reload_event_count"));
batch.setInsertEventCount(rs.getLong("insert_event_count"));
batch.setUpdateEventCount(rs.getLong("update_event_count"));
batch.setDeleteEventCount(rs.getLong("delete_event_count"));
batch.setOtherEventCount(rs.getLong("other_event_count"));
batch.setRouterMillis(rs.getLong("router_millis"));
batch.setNetworkMillis(rs.getLong("network_millis"));
batch.setFilterMillis(rs.getLong("filter_millis"));
batch.setLoadMillis(rs.getLong("load_millis"));
batch.setExtractMillis(rs.getLong("extract_millis"));
batch.setSqlState(rs.getString("sql_state"));
batch.setSqlCode(rs.getInt("sql_code"));
batch.setSqlMessage(rs.getString("sql_message"));
batch.setFailedDataId(rs.getLong("failed_data_id"));
batch.setLastUpdatedHostName(rs.getString("last_update_hostname"));
batch.setLastUpdatedTime(rs.getDateTime("last_update_time"));
batch.setCreateTime(rs.getDateTime("create_time"));
batch.setBatchId(rs.getLong("batch_id"));
batch.setLoadFlag(rs.getBoolean("load_flag"));
batch.setErrorFlag(rs.getBoolean("error_flag"));
return batch;
}
}

public void setNodeService(INodeService nodeService) {
this.nodeService = nodeService;
}
private boolean includeDisabledChannels = false;
private boolean limitBasedOnMaxBatchToSend = false;
private Map<String, Channel> channels;
private Map<String, Integer> countByChannel;

public OutgoingBatchMapper(boolean includeDisabledChannels,
boolean limitBasedOnMaxBatchToSend) {
this.includeDisabledChannels = includeDisabledChannels;
this.limitBasedOnMaxBatchToSend = limitBasedOnMaxBatchToSend;
this.channels = configurationService.getChannels(false);
this.countByChannel = new HashMap<String, Integer>();
}

public void setConfigurationService(IConfigurationService configurationService) {
this.configurationService = configurationService;
public OutgoingBatch mapRow(Row rs) {
String channelId = rs.getString("channel_id");
Channel channel = channels.get(channelId);
Integer count = countByChannel.get(channelId);
if (count == null) {
count = 0;
}
if (channel != null && (includeDisabledChannels || channel.isEnabled())
&& (!limitBasedOnMaxBatchToSend || count <= channel.getMaxBatchToSend())) {
count++;
OutgoingBatch batch = new OutgoingBatch();
batch.setChannelId(channelId);
batch.setNodeId(rs.getString("node_id"));
batch.setStatus(rs.getString("status"));
batch.setByteCount(rs.getLong("byte_count"));
batch.setExtractCount(rs.getLong("extract_count"));
batch.setSentCount(rs.getLong("sent_count"));
batch.setLoadCount(rs.getLong("load_count"));
batch.setDataEventCount(rs.getLong("data_event_count"));
batch.setReloadEventCount(rs.getLong("reload_event_count"));
batch.setInsertEventCount(rs.getLong("insert_event_count"));
batch.setUpdateEventCount(rs.getLong("update_event_count"));
batch.setDeleteEventCount(rs.getLong("delete_event_count"));
batch.setOtherEventCount(rs.getLong("other_event_count"));
batch.setRouterMillis(rs.getLong("router_millis"));
batch.setNetworkMillis(rs.getLong("network_millis"));
batch.setFilterMillis(rs.getLong("filter_millis"));
batch.setLoadMillis(rs.getLong("load_millis"));
batch.setExtractMillis(rs.getLong("extract_millis"));
batch.setSqlState(rs.getString("sql_state"));
batch.setSqlCode(rs.getInt("sql_code"));
batch.setSqlMessage(rs.getString("sql_message"));
batch.setFailedDataId(rs.getLong("failed_data_id"));
batch.setLastUpdatedHostName(rs.getString("last_update_hostname"));
batch.setLastUpdatedTime(rs.getDateTime("last_update_time"));
batch.setCreateTime(rs.getDateTime("create_time"));
batch.setBatchId(rs.getLong("batch_id"));
batch.setLoadFlag(rs.getBoolean("load_flag"));
batch.setErrorFlag(rs.getBoolean("error_flag"));
return batch;
} else {
return null;
}
}
}

}
Expand Up @@ -809,7 +809,7 @@ data.id.increment.by=1
# into memory for the next data extraction.
# DatabaseOverridable: true
# Tags: extract
outgoing.batches.max.to.select=1000
outgoing.batches.max.to.select=50000

# The class name for the Security Service to use for encrypting and
# decrypting database passwords
Expand Down

0 comments on commit c87e9d9

Please sign in to comment.