Skip to content

Commit

Permalink
1930496 - Make building of batches transactional so only one connecti…
Browse files Browse the repository at this point in the history
…on is reserved from the database.
  • Loading branch information
chenson42 committed Mar 31, 2008
1 parent aa32d70 commit 830f7a8
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 73 deletions.
Expand Up @@ -31,6 +31,8 @@ public interface IOutgoingBatchService {
public void insertOutgoingBatch(final OutgoingBatch outgoingBatch);

public void buildOutgoingBatches(String nodeId, final List<NodeChannel> channels);

public void buildOutgoingBatches(final String nodeId, final NodeChannel channel);

public List<OutgoingBatch> getOutgoingBatches(String nodeId);

Expand Down
Expand Up @@ -179,7 +179,9 @@ public boolean extract(Node node, final IExtractListener handler) throws Excepti

List<NodeChannel> channels = configurationService.getChannelsFor(true);

outgoingBatchService.buildOutgoingBatches(node.getNodeId(), channels);
for (NodeChannel nodeChannel : channels) {
outgoingBatchService.buildOutgoingBatches(node.getNodeId(), nodeChannel);
}

List<OutgoingBatch> batches = filterMaxNumberOfOutgoingBatches(outgoingBatchService.getOutgoingBatches(node
.getNodeId()), channels);
Expand Down
Expand Up @@ -209,12 +209,10 @@ public void insertReloadEvent(Node targetNode) {
* data events and reload batches.
*/
private void buildReloadBatches(String nodeId) {
List<NodeChannel> channels = new ArrayList<NodeChannel>(1);
NodeChannel channel = new NodeChannel();
channel.setId(Constants.CHANNEL_RELOAD);
channel.setEnabled(true);
channels.add(channel);
outgoingBatchService.buildOutgoingBatches(nodeId, channels);
outgoingBatchService.buildOutgoingBatches(nodeId, channel);

}

Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.support.JdbcUtils;
import org.springframework.transaction.annotation.Transactional;

public class OutgoingBatchService extends AbstractService implements IOutgoingBatchService {

Expand Down Expand Up @@ -73,7 +74,7 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa

private JdbcTemplate outgoingBatchQueryTemplate;

private IOutgoingBatchHistoryService historyService;
private IOutgoingBatchHistoryService historyService;

private IDbDialect dbDialect;

Expand All @@ -86,7 +87,15 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa
* other than possibly leaving a batch row w/out data every now and then or leaving a batch w/out the
* associated history row.
*/
@Transactional
public void buildOutgoingBatches(final String nodeId, final List<NodeChannel> channels) {
for (NodeChannel nodeChannel : channels) {
buildOutgoingBatches(nodeId, nodeChannel);
}
}

@Transactional
public void buildOutgoingBatches(final String nodeId, final NodeChannel channel) {
jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection conn) throws SQLException, DataAccessException {

Expand All @@ -96,89 +105,86 @@ public Object doInConnection(Connection conn) throws SQLException, DataAccessExc

update.setQueryTimeout(jdbcTemplate.getQueryTimeout());

for (NodeChannel channel : channels) {
if (channel.isSuspended()) {
logger.warn(channel.getId() + " channel for " + nodeId + " is currently suspended.");
} else if (channel.isEnabled()) {
// determine which transactions will be part of this batch on this channel
PreparedStatement select = null;
ResultSet results = null;

if (channel.isSuspended()) {
logger.warn(channel.getId() + " channel for " + nodeId + " is currently suspended.");
} else if (channel.isEnabled()) {
// determine which transactions will be part of this batch on this channel
PreparedStatement select = null;
ResultSet results = null;
try {

try {
select = conn.prepareStatement(selectEventsToBatchSql);

select = conn.prepareStatement(selectEventsToBatchSql);
select.setQueryTimeout(jdbcTemplate.getQueryTimeout());

select.setQueryTimeout(jdbcTemplate.getQueryTimeout());
select.setString(1, nodeId);
select.setString(2, channel.getId());
results = select.executeQuery();

select.setString(1, nodeId);
select.setString(2, channel.getId());
results = select.executeQuery();
int count = 0;
boolean peekAheadMode = false;
int peekAheadCountDown = batchSizePeekAhead;
Set<String> transactionIds = new HashSet<String>();

int count = 0;
boolean peekAheadMode = false;
int peekAheadCountDown = batchSizePeekAhead;
Set<String> transactionIds = new HashSet<String>();
OutgoingBatch newBatch = new OutgoingBatch();
newBatch.setBatchType(BatchType.EVENTS);
newBatch.setChannelId(channel.getId());
newBatch.setNodeId(nodeId);

OutgoingBatch newBatch = new OutgoingBatch();
newBatch.setBatchType(BatchType.EVENTS);
newBatch.setChannelId(channel.getId());
newBatch.setNodeId(nodeId);

// node channel is setup to ignore, just mark the batch as already processed.
if (channel.isIgnored()) {
newBatch.setStatus(Status.OK);
}
// node channel is setup to ignore, just mark the batch as already processed.
if (channel.isIgnored()) {
newBatch.setStatus(Status.OK);
}

if (results.next()) {
if (results.next()) {

insertOutgoingBatch(newBatch);
insertOutgoingBatch(newBatch);

do {
String trxId = results.getString(1);
if (trxId != null) {
transactionIds.add(trxId);
}
do {
String trxId = results.getString(1);
if (trxId != null) {
transactionIds.add(trxId);
}

if (!peekAheadMode
|| (peekAheadMode && (trxId != null && transactionIds.contains(trxId)))) {
peekAheadCountDown = batchSizePeekAhead;
if (!peekAheadMode
|| (peekAheadMode && (trxId != null && transactionIds.contains(trxId)))) {
peekAheadCountDown = batchSizePeekAhead;

int dataId = results.getInt(2);
int dataId = results.getInt(2);

update.clearParameters();
update.setLong(1, Long.valueOf(newBatch.getBatchId()));
update.setString(2, nodeId);
update.setLong(3, dataId);
update.addBatch();
update.clearParameters();
update.setLong(1, Long.valueOf(newBatch.getBatchId()));
update.setString(2, nodeId);
update.setLong(3, dataId);
update.addBatch();

count++;
count++;

} else {
peekAheadCountDown--;
}
} else {
peekAheadCountDown--;
}

if (count > channel.getMaxBatchSize()) {
peekAheadMode = true;
}
if (count > channel.getMaxBatchSize()) {
peekAheadMode = true;
}

// put this in so we don't build up too many
// statements to send to the server.
if (count % 10000 == 0) {
update.executeBatch();
}
// put this in so we don't build up too many
// statements to send to the server.
if (count % 10000 == 0) {
update.executeBatch();
}

} while (results.next() && peekAheadCountDown != 0);
} while (results.next() && peekAheadCountDown != 0);

historyService.created(new Integer(newBatch.getBatchId()), count);
}
historyService.created(new Integer(newBatch.getBatchId()), count);
}

} finally {
} finally {

JdbcUtils.closeResultSet(results);
JdbcUtils.closeStatement(select);
JdbcUtils.closeResultSet(results);
JdbcUtils.closeStatement(select);

}
}

update.executeBatch();
Expand All @@ -191,7 +197,7 @@ public Object doInConnection(Connection conn) throws SQLException, DataAccessExc
});
}

public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) {
public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) {
long batchId = dbDialect.insertWithGeneratedKey(createBatchSql, "sym_outgoing_batch_batch_id",
new PreparedStatementCallback() {
public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
Expand All @@ -212,8 +218,8 @@ public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, D
*/
@SuppressWarnings("unchecked")
public List<OutgoingBatch> getOutgoingBatches(String nodeId) {
return (List<OutgoingBatch>) outgoingBatchQueryTemplate.query(selectOutgoingBatchSql,
new Object[] { nodeId }, new OutgoingBatchMapper());
return (List<OutgoingBatch>) outgoingBatchQueryTemplate.query(selectOutgoingBatchSql, new Object[] { nodeId },
new OutgoingBatchMapper());
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -252,13 +258,13 @@ public boolean isInitialLoadComplete(String nodeId) {
if (security == null || security.isInitialLoadEnabled()) {
return false;
}

List<String> statuses = (List<String>) jdbcTemplate.queryForList(initialLoadStatusSql, new Object[] { nodeId },
String.class);
if (statuses == null || statuses.size() == 0) {
throw new RuntimeException("The initial load has not been started for " + nodeId);
}

for (String status : statuses) {
if (!Status.OK.name().equals(status)) {
return false;
Expand Down
Expand Up @@ -32,8 +32,11 @@ public void buildOutgoingBatches(String nodeId, List<NodeChannel> channels) {

}

public List<OutgoingBatch> getOutgoingBatchRange(String startBatchId,
String endBatchId) {
public void buildOutgoingBatches(String nodeId, NodeChannel channel) {

}

public List<OutgoingBatch> getOutgoingBatchRange(String startBatchId, String endBatchId) {
return null;
}

Expand Down

0 comments on commit 830f7a8

Please sign in to comment.