Skip to content

Commit

Permalink
Merge branch '3.10' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.10
  • Loading branch information
jumpmind-josh committed Dec 10, 2018
2 parents 77b85e2 + 4ab0f6a commit 25429b1
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 6 deletions.
Expand Up @@ -79,4 +79,6 @@ public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTi

public void updateExtractRequestTransferred(OutgoingBatch batch, long transferMillis);

public int cancelExtractRequests(long loadId);

}
Expand Up @@ -61,7 +61,7 @@ public interface IDataService {

public void updateTableReloadRequestsLoadedCounts(ISqlTransaction transcation, long loadId, int batchCount, long rowsCount);

public void updateTableReloadRequestsCancelled(long loadId);
public int updateTableReloadRequestsCancelled(long loadId);

public String reloadNode(String nodeId, boolean reverseLoad, String createBy);

Expand Down
Expand Up @@ -1633,6 +1633,11 @@ public void updateExtractRequestTransferred(OutgoingBatch batch, long transferMi
}
}

@Override
public int cancelExtractRequests(long loadId) {
return sqlTemplate.update(getSql("cancelExtractRequests"), ExtractStatus.OK.name(), loadId);
}

protected boolean writeBatchStats(BufferedWriter writer, char[] buffer, int bufferSize, String prevBuffer, OutgoingBatch batch)
throws IOException {
String bufferString = new String(buffer);
Expand Down Expand Up @@ -2088,8 +2093,8 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
processInfo.setStatus(ProcessInfo.ProcessStatus.OK);

} catch (CancellationException ex) {
log.info("Cancelled extract request {}. Starting at batch {}. Ending at batch {}",
new Object[] { request.getRequestId(), request.getStartBatchId(),
log.info("Interrupted extract request {} for table {} batches {} through {}",
new Object[] { request.getRequestId(), request.getTableName(), request.getStartBatchId(),
request.getEndBatchId() });
processInfo.setStatus(ProcessInfo.ProcessStatus.OK);
} catch (RuntimeException ex) {
Expand Down
Expand Up @@ -49,6 +49,8 @@ public DataExtractorServiceSqlMap(IDatabasePlatform platform,

putSql("resetExtractRequestStatus", "update $(extract_request) set status=?, last_update_time= current_timestamp where start_batch_id <= ? and end_batch_id >= ? and node_id=?");

putSql("cancelExtractRequests", "update $(extract_request) set status=?, last_update_time=current_timestamp where load_id = ?");

putSql("selectIncompleteTablesForExtractByLoadId", "select * from $(extract_request) where load_id = ? and loaded_time is null order by request_id desc");

putSql("selectCompletedTablesForExtractByLoadId", "select * from $(extract_request) where load_id = ? and loaded_time is not null order by request_id");
Expand Down
Expand Up @@ -433,16 +433,18 @@ public void updateTableReloadRequestsLoadAndTableCounts(ISqlTransaction transact
}
}

public void updateTableReloadRequestsCancelled(long loadId) {
public int updateTableReloadRequestsCancelled(long loadId) {
ISqlTransaction transaction = null;
int count = 0;
try {
transaction = sqlTemplate.startSqlTransaction();
transaction.prepareAndExecute(getSql("updateTableReloadRequestCancelled"),
count = transaction.prepareAndExecute(getSql("updateTableReloadRequestCancelled"),
new Object[] {
new Date(), loadId
},
new int[] { Types.TIMESTAMP,Types.NUMERIC});
transaction.commit();
return count;
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
Expand Down
Expand Up @@ -77,7 +77,7 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace
+ " where load_id = ?");

putSql("updateTableReloadRequestCancelled", "update $(table_reload_request) set "
+ " cancelled = 1, completed = 1, last_update_time = ? "
+ " processed = 1, cancelled = 1, completed = 1, last_update_time = ? "
+ " where load_id = ?");

// Note that the order by data_id is done appended in code
Expand Down

0 comments on commit 25429b1

Please sign in to comment.