Skip to content

Commit

Permalink
Merge branch '3.8' of https://github.com/JumpMind/symmetric-ds into 3.8
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jul 7, 2016
2 parents 8f70151 + e6e00cd commit bda0b3d
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 14 deletions.
Expand Up @@ -59,10 +59,10 @@ public interface IIncomingBatchService {
public int deleteIncomingBatch(IncomingBatch batch);

public List<Date> listIncomingBatchTimes(List<String> nodeIds, List<String> channels,
List<IncomingBatch.Status> statuses, boolean ascending);
List<IncomingBatch.Status> statuses, List<String> loads, boolean ascending);

public List<IncomingBatch> listIncomingBatches(List<String> nodeIds, List<String> channels,
List<IncomingBatch.Status> statuses, Date startAtCreateTime, int maxRowsToRetrieve, boolean ascending);
List<IncomingBatch.Status> statuses, List<String> loads, Date startAtCreateTime, int maxRowsToRetrieve, boolean ascending);

public void markIncomingBatchesOk(String nodeId);

Expand Down
Expand Up @@ -90,10 +90,10 @@ public interface IOutgoingBatchService {
public List<OutgoingBatchSummary> findOutgoingBatchSummary(OutgoingBatch.Status ... statuses);

public int countOutgoingBatches(List<String> nodeIds, List<String> channels,
List<OutgoingBatch.Status> statuses);
List<OutgoingBatch.Status> statuses, List<String> loads);

public List<OutgoingBatch> listOutgoingBatches(List<String> nodeIds, List<String> channels,
List<OutgoingBatch.Status> statuses, long startAtBatchId, int rowsExpected, boolean ascending);
List<OutgoingBatch.Status> statuses, List<String> loads, long startAtBatchId, int rowsExpected, boolean ascending);

public List<OutgoingLoadSummary> getLoadSummaries(boolean activeOnly);

Expand Down
Expand Up @@ -202,7 +202,7 @@ protected boolean isCalledFromSymmetricAdminTool() {
}

protected String buildBatchWhere(List<String> nodeIds, List<String> channels,
List<?> statuses) {
List<?> statuses, List<String> loads) {
boolean containsErrorStatus = statuses.contains(OutgoingBatch.Status.ER)
|| statuses.contains(IncomingBatch.Status.ER);
boolean containsIgnoreStatus = statuses.contains(OutgoingBatch.Status.IG)
Expand All @@ -222,6 +222,13 @@ protected String buildBatchWhere(List<String> nodeIds, List<String> channels,
where.append("channel_id in (:CHANNELS)");
needsAnd = true;
}
if (loads.size() > 0) {
if (needsAnd) {
where.append(" and ");
}
where.append("load_id in (:LOADS)");
needsAnd = true;
}
if (statuses.size() > 0) {
if (needsAnd) {
where.append(" and ");
Expand Down
Expand Up @@ -133,30 +133,33 @@ public boolean isRecordOkBatchesEnabled() {
}

public List<Date> listIncomingBatchTimes(List<String> nodeIds, List<String> channels,
List<IncomingBatch.Status> statuses, boolean ascending) {
List<IncomingBatch.Status> statuses, List<String> loads, boolean ascending) {

String whereClause = buildBatchWhere(nodeIds, channels, statuses);
String whereClause = buildBatchWhere(nodeIds, channels, statuses, loads);

Map<String, Object> params = new HashMap<String, Object>();
params.put("NODES", nodeIds);
params.put("CHANNELS", channels);
params.put("STATUSES", toStringList(statuses));

params.put("LOADS", loads);

String sql = getSql("selectCreateTimePrefixSql", whereClause,
ascending ? " order by create_time" : " order by create_time desc");
return sqlTemplate.query(sql, new DateMapper(), params);
}

public List<IncomingBatch> listIncomingBatches(List<String> nodeIds, List<String> channels,
List<IncomingBatch.Status> statuses, Date startAtCreateTime,
List<IncomingBatch.Status> statuses, List<String> loads, Date startAtCreateTime,
final int maxRowsToRetrieve, boolean ascending) {
Map<String, Object> params = new HashMap<String, Object>();
params.put("NODES", nodeIds);
params.put("CHANNELS", channels);
params.put("STATUSES", toStringList(statuses));
params.put("CREATE_TIME", startAtCreateTime);
params.put("CREATE_TIME", startAtCreateTime);
params.put("LOADS", loads);

String where = buildBatchWhere(nodeIds, channels, statuses);
String where = buildBatchWhere(nodeIds, channels, statuses, loads);

String createTimeLimiter = "";
if (startAtCreateTime != null) {
Expand Down Expand Up @@ -418,4 +421,5 @@ public IncomingBatch mapRow(Row rs) {
}
}


}
Expand Up @@ -296,7 +296,7 @@ public int countOutgoingBatchesUnsent(String channelId) {
}

public int countOutgoingBatches(List<String> nodeIds, List<String> channels,
List<OutgoingBatch.Status> statuses) {
List<OutgoingBatch.Status> statuses, List<String> loads) {
Map<String, Object> params = new HashMap<String, Object>();
params.put("NODES", nodeIds);
params.put("CHANNELS", channels);
Expand All @@ -305,18 +305,19 @@ public int countOutgoingBatches(List<String> nodeIds, List<String> channels,
return sqlTemplate
.queryForInt(
getSql("selectCountBatchesPrefixSql",
buildBatchWhere(nodeIds, channels, statuses)), params);
buildBatchWhere(nodeIds, channels, statuses, loads)), params);
}

public List<OutgoingBatch> listOutgoingBatches(List<String> nodeIds, List<String> channels,
List<OutgoingBatch.Status> statuses, long startAtBatchId, final int maxRowsToRetrieve,
List<OutgoingBatch.Status> statuses, List<String> loads, long startAtBatchId, final int maxRowsToRetrieve,
boolean ascending) {

String where = buildBatchWhere(nodeIds, channels, statuses);
String where = buildBatchWhere(nodeIds, channels, statuses, loads);
Map<String, Object> params = new HashMap<String, Object>();
params.put("NODES", nodeIds);
params.put("CHANNELS", channels);
params.put("STATUSES", toStringList(statuses));
params.put("LOADS", loads);
String startAtBatchIdSql = null;
if (startAtBatchId > 0) {
if (StringUtils.isBlank(where)) {
Expand Down

0 comments on commit bda0b3d

Please sign in to comment.