Skip to content

Commit

Permalink
0002860: Avoid locks in extract query with dirty reads
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 13, 2016
1 parent 858f67b commit 8807030
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 16 deletions.
Expand Up @@ -103,7 +103,7 @@ public BatchAckResult ack(final BatchAck batch) {
outgoingBatch.setSqlMessage(batch.getSqlMessage());

if (!batch.isOk() && batch.getErrorLine() != 0) {
List<Number> ids = sqlTemplate.query(getSql("selectDataIdSql"),
List<Number> ids = sqlTemplateDirty.query(getSql("selectDataIdSql"),
new NumberMapper(), outgoingBatch.getBatchId());
if (ids.size() >= batch.getErrorLine()) {
outgoingBatch.setFailedDataId(ids.get((int) batch.getErrorLine() - 1)
Expand Down
Expand Up @@ -84,15 +84,15 @@ public IncomingBatch findIncomingBatch(long batchId, String nodeId) {
}

public int countIncomingBatchesInError() {
return sqlTemplate.queryForInt(getSql("countIncomingBatchesErrorsSql"));
return sqlTemplateDirty.queryForInt(getSql("countIncomingBatchesErrorsSql"));
}

public int countIncomingBatchesInError(String channelId) {
return sqlTemplate.queryForInt(getSql("countIncomingBatchesErrorsOnChannelSql"), channelId);
return sqlTemplateDirty.queryForInt(getSql("countIncomingBatchesErrorsOnChannelSql"), channelId);
}

public List<IncomingBatch> findIncomingBatchErrors(int maxRows) {
return sqlTemplate.query(
return sqlTemplateDirty.query(
getSql("selectIncomingBatchPrefixSql", "findIncomingBatchErrorsSql"), maxRows,
new IncomingBatchMapper());
}
Expand Down
Expand Up @@ -261,7 +261,7 @@ public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo
public OutgoingBatch findOutgoingBatch(long batchId, String nodeId) {
List<OutgoingBatch> list = null;
if (StringUtils.isNotBlank(nodeId)) {
list = (List<OutgoingBatch>) sqlTemplate.query(
list = (List<OutgoingBatch>) sqlTemplateDirty.query(
getSql("selectOutgoingBatchPrefixSql", "findOutgoingBatchSql"),
new OutgoingBatchMapper(true), new Object[] { batchId, nodeId },
new int[] { symmetricDialect.getSqlTypeForIds(), Types.VARCHAR });
Expand All @@ -270,7 +270,7 @@ public OutgoingBatch findOutgoingBatch(long batchId, String nodeId) {
* Pushing to an older version of symmetric might result in a batch
* without the node id
*/
list = (List<OutgoingBatch>) sqlTemplate.query(
list = (List<OutgoingBatch>) sqlTemplateDirty.query(
getSql("selectOutgoingBatchPrefixSql", "findOutgoingBatchByIdOnlySql"),
new OutgoingBatchMapper(true), new Object[] { batchId },
new int[] { symmetricDialect.getSqlTypeForIds() });
Expand All @@ -283,26 +283,26 @@ public OutgoingBatch findOutgoingBatch(long batchId, String nodeId) {
}

public int countOutgoingBatchesInError() {
return sqlTemplate.queryForInt(getSql("countOutgoingBatchesErrorsSql"));
return sqlTemplateDirty.queryForInt(getSql("countOutgoingBatchesErrorsSql"));
}

public int countOutgoingBatchesInError(String channelId) {
return sqlTemplate.queryForInt(getSql("countOutgoingBatchesErrorsOnChannelSql"), channelId);
return sqlTemplateDirty.queryForInt(getSql("countOutgoingBatchesErrorsOnChannelSql"), channelId);
}

@Override
public int countOutgoingBatchesUnsent() {
return sqlTemplate.queryForInt(getSql("countOutgoingBatchesUnsentSql"));
return sqlTemplateDirty.queryForInt(getSql("countOutgoingBatchesUnsentSql"));
}

@Override
public int countOutgoingBatchesUnsent(String channelId) {
return sqlTemplate.queryForInt(getSql("countOutgoingBatchesUnsentOnChannelSql"), channelId);
return sqlTemplateDirty.queryForInt(getSql("countOutgoingBatchesUnsentOnChannelSql"), channelId);
}

@Override
public Map<String, Integer> countOutgoingBatchesPendingByChannel(String nodeId) {
List<Row> rows = sqlTemplate.query(getSql("countOutgoingBatchesByChannelSql"), new Object[]{nodeId});
List<Row> rows = sqlTemplateDirty.query(getSql("countOutgoingBatchesByChannelSql"), new Object[]{nodeId});
Map<String, Integer> results = new HashMap<String, Integer>();
if (rows != null && ! rows.isEmpty()) {
for (Row row : rows) {
Expand Down Expand Up @@ -523,7 +523,7 @@ public OutgoingBatches getOutgoingBatchByLoad(long loadId) {

public OutgoingBatches getOutgoingBatchErrors(int maxRows) {
OutgoingBatches batches = new OutgoingBatches();
batches.setBatches(sqlTemplate.query(
batches.setBatches(sqlTemplateDirty.query(
getSql("selectOutgoingBatchPrefixSql", "selectOutgoingBatchErrorsSql"), maxRows,
new OutgoingBatchMapper(true), null, null));
return batches;
Expand Down Expand Up @@ -593,7 +593,7 @@ public List<OutgoingBatchSummary> findOutgoingBatchSummary(Status... statuses) {
public Set<Long> getActiveLoads(String sourceNodeId) {
Set<Long> loads = new HashSet<Long>();

List<Long> inProcess = sqlTemplate.query(getSql("getActiveLoadsSql"), new ISqlRowMapper<Long>() {
List<Long> inProcess = sqlTemplateDirty.query(getSql("getActiveLoadsSql"), new ISqlRowMapper<Long>() {
@Override
public Long mapRow(Row rs) {
return rs.getLong("load_id");
Expand All @@ -605,7 +605,7 @@ public Long mapRow(Row rs) {
}

public List<String> getQueuedLoads(String sourceNodeId) {
return sqlTemplate.query(getSql("getUnprocessedReloadRequestsSql"), new ISqlRowMapper<String>() {
return sqlTemplateDirty.query(getSql("getUnprocessedReloadRequestsSql"), new ISqlRowMapper<String>() {
@Override
public String mapRow(Row rs) {
return rs.getString("source_node_id") + " to " + rs.getString("target_node_id");
Expand All @@ -614,7 +614,7 @@ public String mapRow(Row rs) {
}

public LoadSummary getLoadSummary(long loadId) {
return sqlTemplate.queryForObject(getSql("getLoadSummarySql"),
return sqlTemplateDirty.queryForObject(getSql("getLoadSummarySql"),
new ISqlRowMapper<LoadSummary>() {

public LoadSummary mapRow(Row rs) {
Expand Down Expand Up @@ -642,7 +642,7 @@ public LoadSummary mapRow(Row rs) {
public Map<String, Map<String, LoadStatusSummary>> getLoadStatusSummarySql(long loadId) {
LoadStatusByQueueMapper mapper = new LoadStatusByQueueMapper();

List<Object> obj = sqlTemplate.query(getSql("getLoadStatusSummarySql"), mapper, loadId);
List<Object> obj = sqlTemplateDirty.query(getSql("getLoadStatusSummarySql"), mapper, loadId);
return mapper.getResults();
}

Expand Down

0 comments on commit 8807030

Please sign in to comment.