Navigation Menu

Skip to content

Commit

Permalink
fixed an ordering issue with outgoing batch selection.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Oct 18, 2007
1 parent 1727921 commit 6a049bf
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 39 deletions.
Expand Up @@ -155,6 +155,7 @@ public boolean extract(Node node, IOutgoingTransport transport)
*/
public boolean extract(Node node, final IExtractListener handler)
throws Exception {

outgoingBatchService.buildOutgoingBatches(node.getNodeId());

List<OutgoingBatch> batches = outgoingBatchService
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.support.JdbcUtils;

Expand All @@ -57,6 +58,8 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa
private String changeBatchStatusSql;

private String initialLoadStatusSql;

private JdbcTemplate outgoingBatchQueryTemplate;

private IOutgoingBatchHistoryService historyService;

Expand Down Expand Up @@ -185,10 +188,14 @@ private void insertOutgoingBatch(Connection conn, OutgoingBatch outgoingBatch) t
historyService.created(new Integer(outgoingBatch.getBatchId()), -1);
}

/**
* Select batches to process. Batches that are NOT in error will be returned first. They will be ordered
* by batch id as the batches will have already been created by {@link #buildOutgoingBatches(String)} in channel
* priority order.
*/
@SuppressWarnings("unchecked")
public List<OutgoingBatch> getOutgoingBatches(String clientId) {
// TODO: limit number of batches handled in one extract pass
return (List<OutgoingBatch>) jdbcTemplate.query(selectOutgoingBatchSql, new Object[] { clientId },
public List<OutgoingBatch> getOutgoingBatches(String nodeId) {
return (List<OutgoingBatch>) outgoingBatchQueryTemplate.query(selectOutgoingBatchSql, new Object[] { nodeId, nodeId },
new RowMapper() {
public Object mapRow(ResultSet rs, int index) throws SQLException {
OutgoingBatch batch = new OutgoingBatch();
Expand Down Expand Up @@ -272,4 +279,8 @@ public void setInitialLoadStatusSql(String initialLoadStatusSql) {
this.initialLoadStatusSql = initialLoadStatusSql;
}

public void setOutgoingBatchQueryTemplate(JdbcTemplate outgoingBatchQueryTemplate) {
this.outgoingBatchQueryTemplate = outgoingBatchQueryTemplate;
}

}
3 changes: 2 additions & 1 deletion symmetric/src/main/resources/symmetric-properties.xml
Expand Up @@ -26,7 +26,8 @@
<prop key="sync.table.prefix">sym</prop>
<prop key="symmetric.auto.config.database">true</prop>
<prop key="symmetric.http.download.rate.kb">-1</prop>
<prop key="symmetric.http.concurrent.workers.max">20</prop>
<prop key="symmetric.http.concurrent.workers.max">20</prop>
<prop key="symmetric.runtime.max.outgoing.batches.to.process">60</prop>
<prop key="symmetric.runtime.engine.name">Default</prop>
<prop key="symmetric.runtime.schema.version">?</prop>
<prop key="symmetric.runtime.trigger.prefix"></prop>
Expand Down
90 changes: 58 additions & 32 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -48,7 +48,7 @@
<value>${sync.table.prefix}_node_security</value>
<value>${sync.table.prefix}_global_parameter</value>
<value>${sync.table.prefix}_channel</value>
<value>${sync.table.prefix}_node_channel_ctl</value>
<value>${sync.table.prefix}_node_channel_ctl</value>
<value>${sync.table.prefix}_trigger</value>
</list>
</property>
Expand All @@ -70,7 +70,13 @@
</value>
</property>
<property name="selectChannelsSql">
<value>select c.channel_id, nc.node_id, nc.ignore_enabled, nc.suspend_enabled, c.processing_order, c.max_batch_size, c.enabled from ${sync.table.prefix}_channel c left outer join ${sync.table.prefix}_node_channel_ctl nc on nc.node_id in (select node_id from ${sync.table.prefix}_node_identity) and c.channel_id=nc.channel_id order by c.processing_order asc</value>
<value>
select c.channel_id, nc.node_id, nc.ignore_enabled, nc.suspend_enabled,
c.processing_order, c.max_batch_size, c.enabled from ${sync.table.prefix}_channel c
left outer join ${sync.table.prefix}_node_channel_ctl nc on nc.node_id in (select
node_id from ${sync.table.prefix}_node_identity) and c.channel_id=nc.channel_id
order by c.processing_order asc
</value>
</property>
<property name="selectGroupTriggersSql">
<value>
Expand All @@ -87,8 +93,8 @@
<property name="activeTriggersForReloadSql">
<value>
select * from ${sync.table.prefix}_trigger where source_node_group_id = ? and
target_node_group_id = ? and channel_id != ? and inactive_time is null
order by initial_load_order
target_node_group_id = ? and channel_id != ? and inactive_time is null order by
initial_load_order
</value>
</property>
<property name="inactiveTriggersForSourceNodeGroupSql">
Expand Down Expand Up @@ -116,8 +122,8 @@
</property>
<property name="insertChannelSql">
<value>
insert into ${sync.table.prefix}_channel values(?, ?, ?, 1,
'System-generated channel. Do not remove.')
insert into ${sync.table.prefix}_channel values(?, ?, ?, 1, 'System-generated
channel. Do not remove.')
</value>
</property>
<property name="allTriggerHistSql">
Expand Down Expand Up @@ -162,9 +168,7 @@
</value>
</property>
<property name="selectTriggerByIdSql">
<value>
select * from ${sync.table.prefix}_trigger where trigger_id = ?
</value>
<value>select * from ${sync.table.prefix}_trigger where trigger_id = ?</value>
</property>
</bean>

Expand All @@ -174,13 +178,21 @@
<property name="runtimeConfiguration" ref="runtimeConfiguration" />
<property name="configurationService" ref="configurationService" />
<property name="historyService" ref="outgoingBatchHistoryService" />

<property name="outgoingBatchQueryTemplate">
<bean class="org.springframework.jdbc.core.JdbcTemplate">
<property name="dataSource" ref="dataSource" />
<property name="queryTimeout" value="${db.sql.query.timeout.seconds}" />
<property name="maxRows"
value="${symmetric.runtime.max.outgoing.batches.to.process}" />
</bean>
</property>
<property name="initialLoadStatusSql">
<value>
select status from ${sync.table.prefix}_outgoing_batch where channel_id='reload' and node_id=?
select status from ${sync.table.prefix}_outgoing_batch where channel_id='reload' and
node_id=?
</value>
</property>

<property name="selectEventsToBatchSql">
<value>
select data.transaction_id, data.data_id from ${sync.table.prefix}_data data,
Expand All @@ -203,11 +215,13 @@
</property>
<property name="selectOutgoingBatchSql">
<value>
select batch.batch_id, batch.node_id, batch.channel_id, batch.status,
batch.batch_type, case status when 'NE' then 3 when 'SE' then 2 when 'ER' then 1 end
from ${sync.table.prefix}_outgoing_batch batch, ${sync.table.prefix}_channel channel
where node_id = ? and status in ('NE','SE','ER') and batch.channel_id =
channel.channel_id order by channel.processing_order asc, 2 asc
select b.batch_id, b.node_id, b.channel_id, b.status, b.batch_type, 1
from ${sync.table.prefix}_outgoing_batch b where b.node_id = ? and b.status in ('NE','SE')
and b.channel_id not in
(select distinct e.channel_id from ${sync.table.prefix}_outgoing_batch e where e.status = 'ER' and e.node_id = b.node_id)
union
select b.batch_id, b.node_id, b.channel_id, b.status, b.batch_type, 2
from ${sync.table.prefix}_outgoing_batch b where b.node_id = ? and b.status = 'ER' order by 6, 1
</value>
</property>
<property name="changeBatchStatusSql">
Expand All @@ -234,24 +248,27 @@
</bean>

<bean id="dataService" class="org.jumpmind.symmetric.service.impl.DataService"
scope="singleton">
<property name="deleteFirstForReload" value="${symmetric.runtime.initial.load.delete.first}" />
scope="singleton">
<property name="deleteFirstForReload"
value="${symmetric.runtime.initial.load.delete.first}" />
<property name="jdbcTemplate" ref="jdbcTemplate" />
<property name="configurationService" ref="configurationService" />
<property name="runtimeConfiguration" ref="runtimeConfiguration" />
<property name="outgoingBatchService" ref="outgoingBatchService"/>
<property name="outgoingBatchService" ref="outgoingBatchService" />
<property name="nodeService" ref="nodeService" />
<property name="dbDialect" ref="dbDialect" />
<property name="tablePrefix" value="${sync.table.prefix}" />
<property name="insertIntoDataSql">
<value>
insert into ${sync.table.prefix}_data
(data_id, channel_id, table_name, event_type, row_data, pk_data, trigger_hist_id, create_time)
values(null, ?, ?, ?, ?, ?, ?, current_timestamp)
insert into ${sync.table.prefix}_data (data_id, channel_id, table_name, event_type,
row_data, pk_data, trigger_hist_id, create_time) values(null, ?, ?, ?, ?, ?, ?,
current_timestamp)
</value>
</property>
<property name="insertIntoDataEventSql">
<value>insert into ${sync.table.prefix}_data_event (data_id, node_id) values(?, ?)</value>
<value>
insert into ${sync.table.prefix}_data_event (data_id, node_id) values(?, ?)
</value>
</property>
</bean>

Expand Down Expand Up @@ -325,11 +342,17 @@
scope="singleton">
<property name="jdbcTemplate" ref="jdbcTemplate" />
<property name="nodeChannelControlIgnoreSql">
<value>update ${sync.table.prefix}_node_channel_ctl set ignore_enabled=? where node_id=? and channel_id=?</value>
<value>
update ${sync.table.prefix}_node_channel_ctl set ignore_enabled=? where node_id=?
and channel_id=?
</value>
</property>
<property name="insertNodeChannelControlSql">
<value>insert into ${sync.table.prefix}_node_channel_ctl (node_id,channel_id,ignore_enabled,suspend_enabled) values(?,?,?,?)</value>
</property>
<value>
insert into ${sync.table.prefix}_node_channel_ctl
(node_id,channel_id,ignore_enabled,suspend_enabled) values(?,?,?,?)
</value>
</property>
<property name="updateNodeSql">
<value>
update ${sync.table.prefix}_node set node_group_id=?, external_id=?,
Expand All @@ -348,7 +371,7 @@
select node_id, node_group_id, external_id, sync_enabled, sync_url, schema_version,
database_type, database_version from ${sync.table.prefix}_node where external_id = ?
</value>
</property>
</property>
<property name="findNodeSecuritySql">
<value>
select node_id, password, registration_enabled, registration_time from
Expand All @@ -364,9 +387,11 @@
</property>
<property name="isNodeRegisteredSql">
<value>
select count(*) from ${sync.table.prefix}_node_security s inner join ${sync.table.prefix}_node n on n.node_id=s.node_id where n.external_id=? and s.registration_time is not null and s.registration_enabled=0
select count(*) from ${sync.table.prefix}_node_security s inner join
${sync.table.prefix}_node n on n.node_id=s.node_id where n.external_id=? and
s.registration_time is not null and s.registration_enabled=0
</value>
</property>
</property>
<property name="findNodesWhoTargetMeSql">
<value>
select c.node_id, c.node_group_id, c.external_id, c.sync_enabled, c.sync_url,
Expand Down Expand Up @@ -476,7 +501,7 @@
<property name="dataExtractorService" ref="dataExtractorService" />
<property name="configurationService" ref="configurationService" />
<property name="nodeService" ref="nodeService" />
</bean>
</bean>
</entry>
<entry key="S">
<bean class="org.jumpmind.symmetric.extract.csv.StreamSQLDataCommand"></bean>
Expand Down Expand Up @@ -536,7 +561,8 @@
create_time &lt; ?
</value>
<value>
delete from ${sync.table.prefix}_data where data_id not in (select data_id from ${sync.table.prefix}_data_event) and create_time &lt; ?
delete from ${sync.table.prefix}_data where data_id not in (select data_id from
${sync.table.prefix}_data_event) and create_time &lt; ?
</value>
<value>
delete from ${sync.table.prefix}_incoming_batch_hist where batch_id in (select
Expand Down
11 changes: 8 additions & 3 deletions symmetric/src/site/apt/index.apt
Expand Up @@ -11,7 +11,12 @@ Welcome to SymetricDS

Satellite databases (i.e. a store servers) register an interest in channels of data. Channels are groups of database tables that can be synchronized in either direction.

SymmetricDS can be deployed as a Java web application. It can be deployed as a standalone service or to an existing application server such as Tomcat, Jetty, or JBoss. The same application software is deployed to the satellite systems as is deployed to the most central location. The centralized locations may by configured as a web farm to ensure QOS for the satellite systems.
SymmetricDS can be deployed in the following configurations:
* web application archive (war) to an existing application server such as Tomcat, Jetty, or JBoss

* as a standalone service

The same application software is deployed to the satellite systems as is deployed to the most central location. The centralized locations may by configured as a web farm to ensure QOS for the satellite systems.

Configuration is always performed at the central most system. The satellite systems get their configuration from their host system. When a satellite application is started it is provided a couple pieces of information so it can begin it's work:

Expand All @@ -29,9 +34,9 @@ Welcome to SymetricDS

* 10(s) of satellite register deployments with their host system being the store servers (allowing for offline registers)

The n-tier configuration can is not limited to the above configuration. For example, a series of regional servers could be inserted between the general office and store server if necessary. Another option might be to not even have store servers.
The n-tier configuration is not limited to the above configuration. For example, a series of regional servers could be inserted between the general office and store server if necessary. Another option might be to not even have store servers.

The transport for data synchronization is HTTP. SymmetricDS chose to use web technologies because of the proven ability to scale. The web architecture is well understood and it is easy to find technical resources to help tune it.
The transport for data synchronization is HTTP. SymmetricDS chooses to use web technologies because of the proven ability to scale. The web architecture is well understood and it is easy to find technical resources to help tune it.

Symmetric works well in both low and high bandwidth environments. It works well with existing databases.

Expand Down

0 comments on commit 6a049bf

Please sign in to comment.