Skip to content

Commit

Permalink
Worked on unit test for RoutingService
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jul 18, 2009
1 parent 74aa974 commit 5f4493f
Show file tree
Hide file tree
Showing 20 changed files with 202 additions and 63 deletions.
Expand Up @@ -140,7 +140,7 @@ private Constants() {

public static final String OUTGOING_BATCH_SERVICE = "outgoingBatchService";

public static final String OUTGOING_BATCH_HISTORY_SERVICE = "outgoingBatchHistoryService";
public static final String TRANSACTION_TEMPLATE = "transactionTemplate";

public static final String PURGE_SERVICE = "purgeService";

Expand Down
Expand Up @@ -119,12 +119,7 @@ public String createInitalLoadSql(Node node, IDbDialect dialect, Trigger trig, T
}

public String createPurgeSql(Node node, IDbDialect dialect, Trigger trig, TriggerHistory hist) {
// TODO: during reload, purge table using initial_load_select clause
String sql = "delete from " + getDefaultTargetTableName(trig, hist);
// + " where " + trig.getInitialLoadSelect();
// sql = replace("groupId", node.getNodeGroupId(), sql);
// sql = replace("externalId", node.getExternalId(), sql);
return sql;
return "delete from " + getDefaultTargetTableName(trig, hist);
}

public String createCsvDataSql(IDbDialect dialect, Trigger trig, Table metaData, String whereClause) {
Expand Down
Expand Up @@ -36,6 +36,8 @@
* each data router is configured using the routing_expression according to its implementation.
*
* @since 2.0
* @see SubSelectDataRouter
* @see ColumnMatchDataRouter
*/
public interface IDataRouter extends IExtensionPoint {

Expand Down
Expand Up @@ -32,6 +32,9 @@

public interface IRoutingContext {

/**
* Get the same template that is being used for inserts into data_event for routing.
*/
public JdbcTemplate getJdbcTemplate();

public NodeChannel getChannel();
Expand All @@ -57,9 +60,9 @@ public interface IRoutingContext {
public void setNeedsCommitted(boolean b);

public void resetForNextData();

public void setEncountedTransactionBoundary(boolean encountedTransactionBoundary);
public boolean isEncountedTransactionBoundary();

public boolean isEncountedTransactionBoundary();

}
Expand Up @@ -88,6 +88,6 @@ public interface IConfigurationService {

public Map<Long, TriggerHistory> getHistoryRecords();

public void insert(Trigger trigger);
public void saveTrigger(Trigger trigger);

}
Expand Up @@ -52,7 +52,7 @@ public class ConfigurationService extends AbstractService implements IConfigurat

private static final long MAX_CHANNEL_CACHE_TIME = 60000;

private static List<NodeChannel> channelCache;
private static List<NodeChannel> channelCache;

private static long channelCacheTime;

Expand All @@ -63,10 +63,9 @@ public class ConfigurationService extends AbstractService implements IConfigurat
private IDbDialect dbDialect;

private String tablePrefix;

/**
* Cache the history for performance. History never changes and does not
* grow big so this should be OK.
* Cache the history for performance. History never changes and does not grow big so this should be OK.
*/
private HashMap<Integer, TriggerHistory> historyMap = new HashMap<Integer, TriggerHistory>();

Expand All @@ -90,7 +89,8 @@ public List<String> getRootConfigChannelTableNames() {

public void saveChannel(Channel channel) {
if (0 == jdbcTemplate.update(getSql("updateChannelSql"), new Object[] { channel.getProcessingOrder(),
channel.getMaxBatchSize(), channel.getMaxBatchToSend(), channel.isEnabled() ? 1 : 0, channel.getId(), channel.getBatchAlgorithm() })) {
channel.getMaxBatchSize(), channel.getMaxBatchToSend(), channel.isEnabled() ? 1 : 0, channel.getId(),
channel.getBatchAlgorithm() })) {
jdbcTemplate.update(getSql("insertChannelSql"), new Object[] { channel.getId(),
channel.getProcessingOrder(), channel.getMaxBatchSize(), channel.getMaxBatchToSend(),
channel.isEnabled() ? 1 : 0, channel.getBatchAlgorithm() });
Expand All @@ -111,12 +111,12 @@ protected List<Trigger> getConfigurationTriggers(String sourceGroupId, String ta
List<Trigger> triggers = new ArrayList<Trigger>(tables.size());
for (int j = 0; j < tables.size(); j++) {
String tableName = tables.get(j);
boolean syncChanges = !TableConstants.getNodeTablesAsSet(tablePrefix).contains(tableName);
boolean syncChanges = !TableConstants.getNodeTablesAsSet(tablePrefix).contains(tableName);
Trigger trigger = buildConfigTrigger(tableName, syncChanges, sourceGroupId, targetGroupId);
trigger.setInitialLoadOrder(initialLoadOrder++);
// TODO Set data router to replace the routing done by the node select
//String initialLoadSelect = rootConfigChannelInitialLoadSelect.get(tableName);
//trigger.setInitialLoadSelect(initialLoadSelect);
// String initialLoadSelect = rootConfigChannelInitialLoadSelect.get(tableName);
// trigger.setInitialLoadSelect(initialLoadSelect);
triggers.add(trigger);
}
return triggers;
Expand All @@ -138,11 +138,10 @@ protected Trigger buildConfigTrigger(String tableName, boolean syncChanges, Stri
trigger.setChannelId(Constants.CHANNEL_CONFIG);
// little trick to force the rebuild of sym triggers every time
// there is a new version of symmetricds
trigger.setLastModifiedTime(new Date(Version.version().hashCode()));
trigger.setLastModifiedTime(new Date(Version.version().hashCode()));
return trigger;
}


public NodeChannel getChannel(String channelId) {
List<NodeChannel> channels = getChannels();
for (NodeChannel nodeChannel : channels) {
Expand All @@ -152,7 +151,7 @@ public NodeChannel getChannel(String channelId) {
}
return null;
}

@SuppressWarnings("unchecked")
public List<NodeChannel> getChannels() {
if (System.currentTimeMillis() - channelCacheTime >= MAX_CHANNEL_CACHE_TIME || channelCache == null) {
Expand Down Expand Up @@ -196,8 +195,7 @@ public DataEventAction getDataEventActionsByGroupId(String sourceGroupId, String
}

/**
* Create triggers on SymmetricDS tables so changes to configuration can be
* synchronized.
* Create triggers on SymmetricDS tables so changes to configuration can be synchronized.
*/
protected List<Trigger> getConfigurationTriggers(String sourceNodeGroupId) {
List<Trigger> triggers = new ArrayList<Trigger>();
Expand All @@ -209,10 +207,11 @@ protected List<Trigger> getConfigurationTriggers(String sourceNodeGroupId) {
} else if (nodeGroupLink.getDataEventAction().equals(DataEventAction.PUSH)) {
triggers.add(buildConfigTrigger(TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE),
false, nodeGroupLink.getSourceGroupId(), nodeGroupLink.getTargetGroupId()));
logger.info("Creating trigger hist entry for " + TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE));
logger.info("Creating trigger hist entry for "
+ TableConstants.getTableName(tablePrefix, TableConstants.SYM_NODE));
} else {
logger.warn("Unexpected node group link while creating configuration triggers: source_node_group_id="
+ sourceNodeGroupId + ", action=" + nodeGroupLink.getDataEventAction());
+ sourceNodeGroupId + ", action=" + nodeGroupLink.getDataEventAction());
}
}
return triggers;
Expand Down Expand Up @@ -325,23 +324,41 @@ public void insert(TriggerHistory newHistRecord) {
Types.VARCHAR, Types.VARCHAR, Types.BIGINT });
}

public void insert(Trigger trigger) {
jdbcTemplate.update(getSql("insertTriggerSql"), new Object[] { trigger.getSourceCatalogName(),
public void saveTrigger(Trigger trigger) {
if (0 == jdbcTemplate.update(getSql("updateTriggerSql"), new Object[] { trigger.getSourceCatalogName(),
trigger.getSourceSchemaName(), trigger.getSourceTableName(), trigger.getTargetCatalogName(),
trigger.getTargetSchemaName(), trigger.getTargetTableName(), trigger.getSourceGroupId(),
trigger.getTargetGroupId(), trigger.getChannelId(), trigger.isSyncOnUpdate() ? 1 : 0,
trigger.isSyncOnInsert() ? 1 : 0, trigger.isSyncOnDelete() ? 1 : 0,
trigger.isSyncOnIncomingBatch() ? 1 : 0, trigger.getNameForUpdateTrigger(),
trigger.getNameForInsertTrigger(), trigger.getNameForDeleteTrigger(),
trigger.getSyncOnUpdateCondition(), trigger.getSyncOnInsertCondition(),
trigger.getSyncOnDeleteCondition(), trigger.getRouterExpression(),
trigger.getTxIdExpression(), trigger.getExcludedColumnNames(), trigger.getIntialLoadSelect(), trigger.getInitialLoadOrder(),
new Date(), null, trigger.getUpdatedBy(), new Date() }, new int[] { Types.VARCHAR, Types.VARCHAR,
trigger.getSyncOnDeleteCondition(), trigger.getRouterExpression(), trigger.getTxIdExpression(),
trigger.getExcludedColumnNames(), trigger.getIntialLoadSelect(), trigger.getInitialLoadOrder(),
new Date(), null, trigger.getUpdatedBy(), new Date(), trigger.getTriggerId() }, new int[] {
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP,
Types.VARCHAR, Types.TIMESTAMP });
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT,
Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.INTEGER,
Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER })) {
jdbcTemplate.update(getSql("insertTriggerSql"), new Object[] { trigger.getSourceCatalogName(),
trigger.getSourceSchemaName(), trigger.getSourceTableName(), trigger.getTargetCatalogName(),
trigger.getTargetSchemaName(), trigger.getTargetTableName(), trigger.getSourceGroupId(),
trigger.getTargetGroupId(), trigger.getChannelId(), trigger.isSyncOnUpdate() ? 1 : 0,
trigger.isSyncOnInsert() ? 1 : 0, trigger.isSyncOnDelete() ? 1 : 0,
trigger.isSyncOnIncomingBatch() ? 1 : 0, trigger.getNameForUpdateTrigger(),
trigger.getNameForInsertTrigger(), trigger.getNameForDeleteTrigger(),
trigger.getSyncOnUpdateCondition(), trigger.getSyncOnInsertCondition(),
trigger.getSyncOnDeleteCondition(), trigger.getRouterExpression(), trigger.getTxIdExpression(),
trigger.getExcludedColumnNames(), trigger.getIntialLoadSelect(), trigger.getInitialLoadOrder(),
new Date(), null, trigger.getUpdatedBy(), new Date() }, new int[] { Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP,
Types.VARCHAR, Types.TIMESTAMP });
}

}

public Map<Long, TriggerHistory> getHistoryRecords() {
Expand Down Expand Up @@ -445,7 +462,7 @@ public Object mapRow(java.sql.ResultSet rs, int arg1) throws java.sql.SQLExcepti
trig.setNameForDeleteTrigger(rs.getString("name_for_delete_trigger"));
trig.setNameForInsertTrigger(rs.getString("name_for_insert_trigger"));
trig.setNameForUpdateTrigger(rs.getString("name_for_update_trigger"));
String schema = rs.getString("source_schema_name");
String schema = rs.getString("source_schema_name");
trig.setSourceSchemaName(schema);
String catalog = rs.getString("source_catalog_name");
if (catalog == null && schema != null && dbDialect instanceof MySqlDbDialect) {
Expand Down
Expand Up @@ -36,9 +36,7 @@
import org.jumpmind.symmetric.transport.handler.AuthenticationResourceHandler.AuthenticationStatus;

/**
* This better be the first filter that executes ! TODO: if this thing fails,
* should it prevent further processing of the request?
*
* This better be the first filter that executes!
*/
public class AuthenticationFilter extends AbstractTransportFilter<AuthenticationResourceHandler> {

Expand Down
Expand Up @@ -38,8 +38,7 @@
import org.jumpmind.symmetric.transport.InetAddressResourceHandler;

/**
* This better be the first filter that executes ! TODO: if this thing fails,
* should it prevent further processing of the request?
* This better be the first filter that executes!
*/
public class InetAddressFilter extends AbstractTransportFilter<InetAddressResourceHandler> {
public static final String INET_ADDRESS_FILTERS = "inetAddressFilters";
Expand Down Expand Up @@ -76,8 +75,6 @@ public boolean isContainerCompatible() {

public void doFilter(final ServletRequest req, final ServletResponse resp, final FilterChain chain)
throws IOException, ServletException {
// final IInetAddressAuthorizer authorizer =
// getTransportResourceHandler();
final HttpServletRequest httpRequest = (HttpServletRequest) req;
final String sourceAddrString = httpRequest.getRemoteAddr();
try {
Expand Down
Expand Up @@ -59,7 +59,8 @@ protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws Se

getTransportResourceHandler().push(inputStream, outputStream);

outputStream.flush(); // TODO: why is this necessary?
// Not sure if this is necessary, but it's been here and it hasn't hurt anything ...
outputStream.flush();

if (logger.isDebugEnabled()) {
logger.debug(String.format("Done with Push request from %s", nodeId));
Expand Down
Expand Up @@ -88,7 +88,7 @@ public void init(ServletConfig config) throws ServletException {
if (ctx.getParent() != null) {
servletBeans.putAll(ctx.getParent().getBeansOfType(
IServletExtension.class));
}
}
// TODO order using initOrder
for (final Map.Entry<String, IServletExtension> servletEntry : servletBeans
.entrySet()) {
Expand Down
2 changes: 1 addition & 1 deletion symmetric/src/main/resources/ddl-config.xml
Expand Up @@ -21,7 +21,7 @@
<column name="row_data" type="LONGVARCHAR" />
<column name="pk_data" type="LONGVARCHAR" />
<column name="old_data" type="LONGVARCHAR" />
<!-- It might be nice to add a router_select element that can be selected similar
<!-- TODO It might be nice to add a router_select element that can be selected similar
to the old node_select which would be available to the routers. This would allow
SQL expressions back in the triggers w/out using sym_data_event -->
<column name="trigger_hist_id" type="INTEGER" required="true" />
Expand Down
Expand Up @@ -161,7 +161,20 @@
(source_catalog_name,source_schema_name,source_table_name,target_catalog_name,target_schema_name,target_table_name,source_node_group_id,target_node_group_id,channel_id,sync_on_update,sync_on_insert,sync_on_delete,sync_on_incoming_batch,name_for_update_trigger,name_for_insert_trigger,name_for_delete_trigger,sync_on_update_condition,sync_on_insert_condition,sync_on_delete_condition,router_expression,tx_id_expression,excluded_column_names,initial_load_select,initial_load_order,create_time,inactive_time,last_updated_by,last_updated_time)
values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
</value>
</entry>
</entry>
<entry key="updateTriggerSql">
<value>
update $[sym.sync.table.prefix]_trigger
set source_catalog_name=?,source_schema_name=?,source_table_name=?,
target_catalog_name=?,target_schema_name=?,target_table_name=?,source_node_group_id=?,
target_node_group_id=?,channel_id=?,sync_on_update=?,sync_on_insert=?,sync_on_delete=?,
sync_on_incoming_batch=?,name_for_update_trigger=?,name_for_insert_trigger=?,
name_for_delete_trigger=?,sync_on_update_condition=?,sync_on_insert_condition=?,
sync_on_delete_condition=?,router_expression=?,tx_id_expression=?,excluded_column_names=?,
initial_load_select=?,initial_load_order=?,create_time=?,inactive_time=?,last_updated_by=?,last_updated_time=?
where trigger_id=?
</value>
</entry>
<entry key="selectTriggerSql">
<value>
select * from $[sym.sync.table.prefix]_trigger where source_table_name = ? and
Expand Down

0 comments on commit 5f4493f

Please sign in to comment.