Skip to content

Commit

Permalink
Fix ordering of batches after an error occurs.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Oct 19, 2008
1 parent e22e6e6 commit 155132e
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 38 deletions.
3 changes: 3 additions & 0 deletions symmetric/src/changes/changes.xml
Expand Up @@ -6,6 +6,9 @@
</properties>
<body>
<release version="1.4.1" date="2008-11-01" description="Patch Release">
<action dev="chenson42" type="fix">
Batches were not being ordered correctly during the extraction when a channel was in error.
</action>
<action dev="erilong" type="fix" issue="aid=2155736&amp;atid=997724">
Loss of precison with Float type on Oracle
</action>
Expand Down
Expand Up @@ -192,8 +192,7 @@ public boolean extract(Node node, final IExtractListener handler) throws Excepti
outgoingBatchService.buildOutgoingBatches(node.getNodeId(), nodeChannel);
}

List<OutgoingBatch> batches = filterMaxNumberOfOutgoingBatches(outgoingBatchService.getOutgoingBatches(node
.getNodeId()), channels);
List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatches(node.getNodeId());

if (batches != null && batches.size() > 0) {
OutgoingBatchHistory history = null;
Expand All @@ -210,12 +209,14 @@ public boolean extract(Node node, final IExtractListener handler) throws Excepti
}
} catch (RuntimeException e) {
SQLException se = unwrapSqlException(e);
if (se != null && history != null) {
history.setSqlState(se.getSQLState());
history.setSqlCode(se.getErrorCode());
history.setSqlMessage(se.getMessage());
}
if (history != null) {
if (se != null) {
history.setSqlState(se.getSQLState());
history.setSqlCode(se.getErrorCode());
history.setSqlMessage(se.getMessage());
} else {
history.setSqlMessage(e.getMessage());
}
history.setStatus(OutgoingBatchHistory.Status.SE);
history.setEndTime(new Date());
outgoingBatchService.setBatchStatus(history.getBatchId(), Status.ER);
Expand All @@ -234,28 +235,6 @@ public boolean extract(Node node, final IExtractListener handler) throws Excepti
return false;
}

/**
* Filter out the maximum number of batches to send.
*/
private List<OutgoingBatch> filterMaxNumberOfOutgoingBatches(List<OutgoingBatch> batches, List<NodeChannel> channels) {
if (batches != null && batches.size() > 0) {
List<OutgoingBatch> filtered = new ArrayList<OutgoingBatch>(batches.size());
for (NodeChannel channel : channels) {
int max = channel.getMaxBatchToSend();
int count = 0;
for (OutgoingBatch outgoingBatch : batches) {
if (channel.getId().equals(outgoingBatch.getChannelId()) && count < max) {
filtered.add(outgoingBatch);
count++;
}
}
}
return filtered;
} else {
return batches;
}
}

public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, String endBatchId)
throws Exception {
IDataExtractor dataExtractor = getDataExtractor(null);
Expand Down
Expand Up @@ -219,6 +219,7 @@ protected List<IncomingBatchHistory> loadDataAndReturnBatches(IIncomingTransport
if (dataLoader != null && status != null) {
if (e instanceof IOException || e instanceof TransportException) {
logger.warn("Failed to load batch " + status.getNodeBatchId() + " because: " + e.getMessage());
history.setSqlMessage(e.getMessage());
statisticManager.getStatistic(StatisticName.INCOMING_TRANSPORT_ERROR_COUNT).increment();
} else {
logger.error("Failed to load batch " + status.getNodeBatchId(), e);
Expand All @@ -229,6 +230,7 @@ protected List<IncomingBatchHistory> loadDataAndReturnBatches(IIncomingTransport
history.setSqlCode(se.getErrorCode());
history.setSqlMessage(se.getMessage());
} else {
history.setSqlMessage(e.getMessage());
statisticManager.getStatistic(StatisticName.INCOMING_OTHER_ERROR_COUNT).increment();
}
}
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
Expand All @@ -45,6 +46,7 @@
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchHistory;
import org.jumpmind.symmetric.model.OutgoingBatch.Status;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
import org.jumpmind.symmetric.statistic.IStatisticManager;
Expand All @@ -66,6 +68,8 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa
private IDbDialect dbDialect;

private IStatisticManager statisticManager;

private IConfigurationService configurationService;

/**
* Create a batch and mark events as tied to that batch. We iterate through
Expand Down Expand Up @@ -284,20 +288,46 @@ public List<OutgoingBatch> getOutgoingBatches(String nodeId) {
errorChannels.add(batch.getChannelId());
}
}
Collections.sort(list, new Comparator<OutgoingBatch>() {
public int compare(OutgoingBatch b1, OutgoingBatch b2) {
boolean isError1 = errorChannels.contains(b1.getChannelId());
boolean isError2 = errorChannels.contains(b2.getChannelId());

List<NodeChannel> channels = configurationService.getChannels();
Collections.sort(channels, new Comparator<NodeChannel>() {
public int compare(NodeChannel b1, NodeChannel b2) {
boolean isError1 = errorChannels.contains(b1.getId());
boolean isError2 = errorChannels.contains(b2.getId());
if (isError1 == isError2) {
return b1.getBatchId() < b2.getBatchId() ? -1 : 1;
return b1.getProcessingOrder() < b2.getProcessingOrder() ? -1 : 1;
} else if (!isError1 && isError2) {
return -1;
} else {
return 1;
}
}
});
return list;


return filterMaxNumberOfOutgoingBatchesByChannel(list, channels);
}

/**
* Filter out the maximum number of batches to send.
*/
private List<OutgoingBatch> filterMaxNumberOfOutgoingBatchesByChannel(List<OutgoingBatch> batches, List<NodeChannel> channels) {
if (batches != null && batches.size() > 0) {
List<OutgoingBatch> filtered = new ArrayList<OutgoingBatch>(batches.size());
for (NodeChannel channel : channels) {
int max = channel.getMaxBatchToSend();
int count = 0;
for (OutgoingBatch outgoingBatch : batches) {
if (channel.getId().equals(outgoingBatch.getChannelId()) && count < max) {
filtered.add(outgoingBatch);
count++;
}
}
}
return filtered;
} else {
return batches;
}
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -410,4 +440,8 @@ public void setStatisticManager(IStatisticManager statisticManager) {
this.statisticManager = statisticManager;
}

public void setConfigurationService(IConfigurationService configurationService) {
this.configurationService = configurationService;
}

}
Expand Up @@ -47,7 +47,7 @@
<entry key="selectOutgoingBatchSql">
<value>
select batch_id, node_id, channel_id, status, batch_type, create_time from
${sync.table.prefix}_outgoing_batch where node_id = ? and status in (?, ?, ?)
${sync.table.prefix}_outgoing_batch where node_id = ? and status in (?, ?, ?) order by batch_id asc
</value>
</entry>
<entry key="selectOutgoingBatchRangeSql">
Expand Down
1 change: 1 addition & 0 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -80,6 +80,7 @@
<property name="sql" ref="outgoingBatchServiceSql"/>
<property name="jdbcTemplate" ref="jdbcTemplate" />
<property name="parameterService" ref="parameterService" />
<property name="configurationService" ref="configurationService" />
<property name="nodeService" ref="nodeService" />
<property name="statisticManager" ref="statisticManager" />
<property name="dbDialect" ref="dbDialect" />
Expand Down
Expand Up @@ -242,9 +242,9 @@ public void testErrorChannel() {
assertEquals(batches.size(), 3);
assertEquals(batches.get(0).getBatchId(), secondBatchId,
"Channel in error should have batches last - missing new batch");
assertEquals(batches.get(1).getBatchId(), firstBatchId,
assertEquals(batches.get(1).getBatchId(), thirdBatchId,
"Channel in error should have batches last - missing error batch");
assertEquals(batches.get(2).getBatchId(), thirdBatchId,
assertEquals(batches.get(2).getBatchId(), firstBatchId,
"Channel in error should have batches last - missing new batch");

}
Expand Down

0 comments on commit 155132e

Please sign in to comment.