Permalink
Browse files

use AtomicInteger for counting subcontexts

eliminates synchronization on remove
  • Loading branch information...
seut committed Sep 22, 2015
1 parent 64b3278 commit e710e3622f30517bc13c09a5dd37bf7e59a5933e
Showing with 11 additions and 12 deletions.
  1. +11 −12 sql/src/main/java/io/crate/jobs/JobExecutionContext.java
@@ -38,13 +38,15 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class JobExecutionContext implements KeepAliveListener {
private static final ESLogger LOGGER = Loggers.getLogger(JobExecutionContext.class);
private final UUID jobId;
private final ConcurrentMap<Integer, ExecutionSubContext> subContexts = new ConcurrentHashMap<>();
private final AtomicInteger numSubContexts = new AtomicInteger();
private final List<Integer> orderedContextIds;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ArrayList<SubExecutionContextFuture> futures;
@@ -119,11 +121,12 @@ void setCloseCallback(Callback<JobExecutionContext> contextCallback) {
private void addContext(int subContextId, ExecutionSubContext subContext) {
if (subContexts.put(subContextId, subContext) == null) {
int currentSubContextSize = numSubContexts.incrementAndGet();
subContext.keepAliveListener(this);
SubExecutionContextFuture future = subContext.future();
future.addCallback(new RemoveSubContextCallback(subContextId));
futures.add(future);
LOGGER.trace("adding subContext {}, now there are {} subContexts", subContextId, subContexts.size());
LOGGER.trace("adding subContext {}, now there are {} subContexts", subContextId, currentSubContextSize);
} else {
throw new IllegalArgumentException(String.format(Locale.ENGLISH, "subContext %d is already present", subContextId));
}
@@ -185,7 +188,7 @@ public long kill() {
if (!closed.getAndSet(true)) {
LOGGER.trace("kill called on JobExecutionContext {}", jobId);
if (subContexts.size() == 0) {
if (numSubContexts.get() == 0) {
callCloseCallback();
} else {
for (ExecutionSubContext executionSubContext : subContexts.values()) {
@@ -198,7 +201,8 @@ public long kill() {
}
try {
chainedFuture.get();
assert subContexts.values().size() == 0: "unexpected subcontexts there: " + subContexts.values().size();
int currentNumSubContexts = numSubContexts.get();
assert currentNumSubContexts == 0: "unexpected subcontexts there: " + currentNumSubContexts;
} catch (Exception e) {
throw Throwables.propagate(e);
}
@@ -208,7 +212,7 @@ public long kill() {
public void close() {
if (closed.compareAndSet(false, true)) {
LOGGER.trace("close called on JobExecutionContext {}", jobId);
if (subContexts.size() == 0) {
if (numSubContexts.get() == 0) {
callCloseCallback();
} else {
for (ExecutionSubContext executionSubContext : subContexts.values()) {
@@ -234,7 +238,7 @@ private void callCloseCallback() {
public String toString() {
return "JobExecutionContext{" +
"jobId=" + jobId +
", activeSubContexts=" + subContexts.size() +
", activeSubContexts=" + numSubContexts.get() +
", closed=" + closed +
'}';
}
@@ -248,14 +252,9 @@ private RemoveSubContextCallback(int id) {
}
private RemoveSubContextPosition remove(){
ExecutionSubContext removed;
int remaining;
synchronized (subContexts){
removed = subContexts.remove(id);
remaining = subContexts.size();
}
ExecutionSubContext removed = subContexts.remove(id);
assert removed != null;
if (remaining == 0){
if (numSubContexts.decrementAndGet() == 0){
callCloseCallback();
return RemoveSubContextPosition.LAST;
}

0 comments on commit e710e36

Please sign in to comment.