Skip to content

Commit

Permalink
0002161: Interrupt working threads when a symmetric engine is stopped…
Browse files Browse the repository at this point in the history
…. Also check for interrupted threads when processing data.
  • Loading branch information
chenson42 committed Jan 26, 2015
1 parent 2506956 commit 65c9d12
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 1 deletion.
Expand Up @@ -62,6 +62,8 @@
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.NodeStatus;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfo.Status;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.TriggerRouter;
Expand Down Expand Up @@ -714,6 +716,21 @@ public synchronized void stop() {
nodeCommunicationService.stop();
}

List<ProcessInfo> infos = getStatisticManager().getProcessInfos();
for (ProcessInfo processInfo : infos) {
Thread thread = processInfo.getThread();
if (processInfo.getStatus() != Status.OK && thread.isAlive()) {
log.info("Trying to interrupt thread '{}' ", thread.getName());
try {
thread.interrupt();
} catch (Exception e) {
log.info("Caught exception while attempting to interrupt thread", e);
}
}
}

Thread.interrupted();

started = false;
starting = false;
}
Expand Down
Expand Up @@ -282,7 +282,7 @@ public int compareTo(ProcessInfo o) {
} else {
return o.startTime.compareTo(startTime);
}
}
}

public ThreadData getThreadData() {
if (thread != null && thread.isAlive()) {
Expand Down
Expand Up @@ -21,6 +21,7 @@
package org.jumpmind.symmetric.io.data;

import org.jumpmind.db.model.Table;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.data.writer.IgnoreBatchException;
import org.jumpmind.util.Statistics;
Expand Down Expand Up @@ -216,6 +217,10 @@ protected int forEachDataInTable(DataContext context, boolean processTable, Batc
}
ts = System.currentTimeMillis();
}

if (Thread.currentThread().isInterrupted()) {
throw new IoException("This thread was interrupted");
}
} while (currentData != null);

if (ignore != null) {
Expand Down

0 comments on commit 65c9d12

Please sign in to comment.