Skip to content

Commit

Permalink
try to keep to the four levels of indentation rule
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jul 18, 2009
1 parent 71e83b8 commit 10dafb4
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 53 deletions.
Expand Up @@ -31,7 +31,7 @@
import org.springframework.jdbc.core.JdbcTemplate;

public interface IRoutingContext {

public JdbcTemplate getJdbcTemplate();

public NodeChannel getChannel();
Expand All @@ -43,9 +43,23 @@ public interface IRoutingContext {
public Map<Trigger, Set<Node>> getAvailableNodes();

public void commit() throws SQLException;

public void rollback();

public void cleanup();

public void setRouted(boolean b);

public boolean isNeedsCommitted();

public boolean isRouted();

public void setNeedsCommitted(boolean b);

public void resetForNextData();

public void setEncountedTransactionBoundary(boolean encountedTransactionBoundary);

public boolean isEncountedTransactionBoundary();

}
Expand Up @@ -37,14 +37,17 @@
import org.springframework.jdbc.support.JdbcUtils;

public class RoutingContext implements IRoutingContext {

protected final Log logger = LogFactory.getLog(getClass());
private NodeChannel channel;
private Map<String, OutgoingBatch> batchesByNodes = new HashMap<String, OutgoingBatch>();
private Map<String, OutgoingBatchHistory> batchHistoryByNodes = new HashMap<String, OutgoingBatchHistory>();
private Map<Trigger, Set<Node>> availableNodes = new HashMap<Trigger, Set<Node>>();
private Connection connection;
private JdbcTemplate jdbcTemplate;
private boolean needsCommitted = false;
private boolean routed = false;
private boolean encountedTransactionBoundary = false;

public RoutingContext(NodeChannel channel, Connection connection) throws SQLException {
this.channel = channel;
Expand Down Expand Up @@ -81,12 +84,40 @@ public void rollback() {
try {
connection.rollback();
} catch (SQLException e) {
logger.warn(e,e);
logger.warn(e, e);
}
}

public void cleanup() {
JdbcUtils.closeConnection(this.connection);
}

public void setNeedsCommitted(boolean b) {
this.needsCommitted = b;
}

public void setRouted(boolean b) {
this.routed = b;
}

public boolean isNeedsCommitted() {
return needsCommitted;
}

public boolean isRouted() {
return routed;
}

public void resetForNextData() {
this.routed = false;
this.needsCommitted = false;
}

public void setEncountedTransactionBoundary(boolean encountedTransactionBoundary) {
this.encountedTransactionBoundary = encountedTransactionBoundary;
}

public boolean isEncountedTransactionBoundary() {
return encountedTransactionBoundary;
}
}
Expand Up @@ -109,11 +109,9 @@ public void routeData() {
}

/**
* We route data channel by channel for two reasons. One is that if/when we
* decide to multi-thread the routing it is a simple matter of inserting a
* thread pool here and waiting for all channels to be processed. The other
* reason is to reduce the amount number of connections we are required to
* have.
* We route data channel by channel for two reasons. One is that if/when we decide to multi-thread the routing it is
* a simple matter of inserting a thread pool here and waiting for all channels to be processed. The other reason is
* to reduce the amount number of connections we are required to have.
*/
protected void routeDataForEachChannel() {
final List<NodeChannel> channels = configurationService.getChannels();
Expand Down Expand Up @@ -205,7 +203,7 @@ protected void selectDataAndRoute(Connection conn, IRoutingContext context) thro
protected void routeData(Data data, Map<String, Long> transactionIdDataId, IRoutingContext routingContext)
throws SQLException {
Long dataId = transactionIdDataId.get(data.getTransactionId());
boolean databaseTransactionBoundary = dataId == null ? true : dataId == data.getDataId();
routingContext.setEncountedTransactionBoundary(dataId == null ? true : dataId == data.getDataId());
// TODO We really shouldn't be referencing the bootstrapService from
// here ... maybe this method needs to move to the configurationService
Trigger trigger = bootstrapService.getCachedTriggers(false).get((data.getTriggerHistory().getTriggerId()));
Expand All @@ -214,48 +212,19 @@ protected void routeData(Data data, Map<String, Long> transactionIdDataId, IRout
if (trigger != null) {
String channelId = trigger.getChannelId();
if (channelId.equals(routingContext.getChannel().getId())) {
boolean commit = false;
boolean routed = false;
routingContext.resetForNextData();
if (!routingContext.getChannel().isIgnored()) {
IDataRouter router = getDataRouter(trigger);
Collection<String> nodeIds = router.routeToNodes(dataMetaData, findAvailableNodes(trigger,
routingContext), false);
if (nodeIds != null && nodeIds.size() > 0) {
for (String nodeId : nodeIds) {
if (data.getSourceNodeId() == null || !data.getSourceNodeId().equals(nodeId)) {
OutgoingBatch batch = routingContext.getBatchesByNodes().get(nodeId);
OutgoingBatchHistory history = routingContext.getBatchHistoryByNodes().get(nodeId);
if (batch == null) {
batch = createNewBatch(routingContext.getJdbcTemplate(), nodeId, channelId);
routingContext.getBatchesByNodes().put(nodeId, batch);
history = new OutgoingBatchHistory(batch);
routingContext.getBatchHistoryByNodes().put(nodeId, history);
}
history.incrementDataEventCount();
routed = true;
dataService.insertDataEvent(routingContext.getJdbcTemplate(), data.getDataId(), nodeId,
batch.getBatchId());
if (batchAlgorithms.get(routingContext.getChannel().getBatchAlgorithm()).completeBatch(
routingContext.getChannel(), history, batch, data, databaseTransactionBoundary)) {
// TODO Add route_time_ms to history. Also
// fix outgoing batch so we don't end up
// with so many history records
outgoingBatchService.insertOutgoingBatchHistory(routingContext.getJdbcTemplate(),
history);
routingContext.getBatchesByNodes().remove(nodeId);
routingContext.getBatchHistoryByNodes().remove(nodeId);
commit = true;
}
}
}
}
insertDataEvents(routingContext, dataMetaData, nodeIds);
}

if (!routed) {
if (!routingContext.isRouted()) {
dataService.insertDataEvent(routingContext.getJdbcTemplate(), data.getDataId(), "-1", -1);
}

if (commit) {
if (routingContext.isNeedsCommitted()) {
routingContext.commit();
}
}
Expand All @@ -267,6 +236,41 @@ protected void routeData(Data data, Map<String, Long> transactionIdDataId, IRout

}

protected void insertDataEvents(IRoutingContext routingContext, DataMetaData dataMetaData,
Collection<String> nodeIds) {
if (nodeIds != null && nodeIds.size() > 0) {
for (String nodeId : nodeIds) {
if (dataMetaData.getData().getSourceNodeId() == null
|| !dataMetaData.getData().getSourceNodeId().equals(nodeId)) {
OutgoingBatch batch = routingContext.getBatchesByNodes().get(nodeId);
OutgoingBatchHistory history = routingContext.getBatchHistoryByNodes().get(nodeId);
if (batch == null) {
batch = createNewBatch(routingContext.getJdbcTemplate(), nodeId, dataMetaData.getChannel()
.getId());
routingContext.getBatchesByNodes().put(nodeId, batch);
history = new OutgoingBatchHistory(batch);
routingContext.getBatchHistoryByNodes().put(nodeId, history);
}
history.incrementDataEventCount();
routingContext.setRouted(true);
dataService.insertDataEvent(routingContext.getJdbcTemplate(), dataMetaData.getData().getDataId(),
nodeId, batch.getBatchId());
if (batchAlgorithms.get(routingContext.getChannel().getBatchAlgorithm()).completeBatch(
routingContext.getChannel(), history, batch, dataMetaData.getData(),
routingContext.isEncountedTransactionBoundary())) {
// TODO Add route_time_ms to history. Also
// fix outgoing batch so we don't end up
// with so many history records
outgoingBatchService.insertOutgoingBatchHistory(routingContext.getJdbcTemplate(), history);
routingContext.getBatchesByNodes().remove(nodeId);
routingContext.getBatchHistoryByNodes().remove(nodeId);
routingContext.setNeedsCommitted(true);
}
}
}
}
}

protected IDataRouter getDataRouter(Trigger trigger) {
IDataRouter router = null;
if (!StringUtils.isBlank(trigger.getRouterName())) {
Expand Down
Expand Up @@ -10,41 +10,45 @@ public class RoutingServiceTest extends AbstractDatabaseTest {

final static String TEST_TABLE_1 = "TEST_ROUTING_DATA_1";
final static String TEST_TABLE_2 = "TEST_ROUTING_DATA_2";

public RoutingServiceTest(String dbName) {
super(dbName);
}

public RoutingServiceTest() throws Exception {
}

protected Trigger getTestRoutingTableTrigger(String tableName) {
Trigger trigger = getConfigurationService().getTriggerFor(tableName, TestConstants.TEST_ROOT_NODE_GROUP);
if (trigger == null) {
trigger = new Trigger(tableName);
trigger.setSourceGroupId(TestConstants.TEST_ROOT_NODE_GROUP);
trigger.setTargetGroupId(TestConstants.TEST_CLIENT_NODE_GROUP);
if (tableName.equals(TEST_TABLE_2)) {
trigger.setChannelId(TestConstants.TEST_CHANNEL_ID_OTHER);
} else {
trigger.setChannelId(TestConstants.TEST_CHANNEL_ID);
}
}
return trigger;
}

@Test
public void testMultiChannelRoutingToEveryone() {
Trigger trigger = getTestRoutingTableTrigger(TEST_TABLE_1);
trigger.setChannelId(TestConstants.TEST_CHANNEL_ID);
getConfigurationService().insert(trigger);
}

@Test
public void syncIncomingBatchTest() throws Exception {

}

@Test
public void testSyncBackToNode() {

}

@Test
@ParameterExcluder("postgres")
public void validateTransactionFunctionailty() throws Exception {
Expand Down

0 comments on commit 10dafb4

Please sign in to comment.