Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 55 additions & 78 deletions src/main/java/com/uber/cadence/internal/replay/DeciderCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,22 @@
package com.uber.cadence.internal.replay;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.uber.cadence.PollForDecisionTaskResponse;
import com.uber.cadence.internal.common.ThrowableFunc1;
import com.uber.cadence.internal.metrics.MetricsType;
import com.uber.m3.tally.Scope;
import java.util.Iterator;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DeciderCache {
private final Scope metricsScope;
private LoadingCache<String, Decider> cache;
private Lock evictionLock = new ReentrantLock();
Random rand = new Random();

private static final Logger log = LoggerFactory.getLogger(DeciderCache.class);
private Lock cacheLock = new ReentrantLock();
private Set<String> inProcessing = new HashSet<>();

public DeciderCache(int maxCacheSize, Scope scope) {
Preconditions.checkArgument(maxCacheSize > 0, "Max cache size must be greater than 0");
Expand All @@ -70,83 +58,79 @@ public Decider load(String key) {
}

public Decider getOrCreate(
PollForDecisionTaskResponse decisionTask,
ThrowableFunc1<PollForDecisionTaskResponse, Decider, Exception> createReplayDecider)
throws Exception {
PollForDecisionTaskResponse decisionTask, Callable<Decider> deciderFunc) throws Exception {
String runId = decisionTask.getWorkflowExecution().getRunId();
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
if (isFullHistory(decisionTask)) {
invalidate(decisionTask);
return cache.get(runId, () -> createReplayDecider.apply(decisionTask));
invalidate(runId);
return deciderFunc.call();
}

Decider decider = getForProcessing(runId);
if (decider != null) {
return decider;
}
AtomicBoolean miss = new AtomicBoolean();
Decider result = null;
return deciderFunc.call();
}

private Decider getForProcessing(String runId) throws Exception {
cacheLock.lock();
try {
result =
cache.get(
runId,
() -> {
miss.set(true);
return createReplayDecider.apply(decisionTask);
});
} catch (UncheckedExecutionException | ExecutionError e) {
Throwables.throwIfUnchecked(e.getCause());
Decider decider = cache.get(runId);
inProcessing.add(runId);
metricsScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
return decider;
} catch (CacheLoader.InvalidCacheLoadException e) {
// We don't have a default loader and don't want to have one. So it's ok to get null value.
metricsScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);
return null;
} finally {
if (miss.get()) {
metricsScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);
} else {
metricsScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1);
}
cacheLock.unlock();
}
return result;
}

public void evictAny(String runId) throws InterruptedException {
// Timeout is to guard against workflows trying to evict each other.
if (!evictionLock.tryLock(rand.nextInt(4), TimeUnit.SECONDS)) {
return;
void markProcessingDone(PollForDecisionTaskResponse decisionTask) {
String runId = decisionTask.getWorkflowExecution().getRunId();

cacheLock.lock();
try {
inProcessing.remove(runId);
} finally {
cacheLock.unlock();
}
}

public void addToCache(PollForDecisionTaskResponse decisionTask, Decider decider) {
String runId = decisionTask.getWorkflowExecution().getRunId();
cache.put(runId, decider);
}

public boolean evictAnyNotInProcessing(String runId) {
cacheLock.lock();
try {
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
Set<String> set = cache.asMap().keySet();
if (set.isEmpty()) {
return;
}
Iterator<String> iter = cache.asMap().keySet().iterator();
String key = "";
while (iter.hasNext()) {
key = iter.next();
if (!key.equals(runId)) {
break;
for (String key : cache.asMap().keySet()) {
if (!key.equals(runId) && !inProcessing.contains(key)) {
cache.invalidate(key);
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
metricsScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
return true;
}
}

if (key.equals(runId)) {
log.warn(String.format("%s attempted to self evict. Ignoring eviction", runId));
return;
}
cache.invalidate(key);
metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
metricsScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
return false;
} finally {
evictionLock.unlock();
cacheLock.unlock();
}
}

public void invalidate(PollForDecisionTaskResponse decisionTask) throws InterruptedException {
String runId = decisionTask.getWorkflowExecution().getRunId();
invalidate(runId);
}

private void invalidate(String runId) throws InterruptedException {
if (!evictionLock.tryLock(rand.nextInt(4), TimeUnit.SECONDS)) {
return;
}
void invalidate(String runId) {
cacheLock.lock();
try {
cache.invalidate(runId);
inProcessing.remove(runId);
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
} finally {
evictionLock.unlock();
cacheLock.unlock();
}
}

Expand All @@ -163,11 +147,4 @@ private boolean isFullHistory(PollForDecisionTaskResponse decisionTask) {
public void invalidateAll() {
cache.invalidateAll();
}

public static class EvictedException extends Exception {

public EvictedException(String runId) {
super(String.format("cache was evicted for the decisionTask. RunId: %s", runId));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -115,12 +116,26 @@ private Result handleDecisionTaskImpl(PollForDecisionTaskResponse decisionTask)

private Result processDecision(PollForDecisionTaskResponse decisionTask) throws Throwable {
Decider decider = null;
AtomicBoolean createdNew = new AtomicBoolean();
try {
decider =
stickyTaskListName == null
? createDecider(decisionTask)
: cache.getOrCreate(decisionTask, this::createDecider);
if (stickyTaskListName == null) {
decider = createDecider(decisionTask);
} else {
decider =
cache.getOrCreate(
decisionTask,
() -> {
createdNew.set(true);
return createDecider(decisionTask);
});
}

List<Decision> decisions = decider.decide(decisionTask);

if (stickyTaskListName != null && createdNew.get()) {
cache.addToCache(decisionTask, decider);
}

if (log.isTraceEnabled()) {
WorkflowExecution execution = decisionTask.getWorkflowExecution();
log.trace(
Expand Down Expand Up @@ -148,12 +163,14 @@ private Result processDecision(PollForDecisionTaskResponse decisionTask) throws
return createCompletedRequest(decisionTask, decisions);
} catch (Throwable e) {
if (stickyTaskListName != null) {
cache.invalidate(decisionTask);
cache.invalidate(decisionTask.getWorkflowExecution().getRunId());
}
throw e;
} finally {
if (stickyTaskListName == null && decider != null) {
decider.close();
} else {
cache.markProcessingDone(decisionTask);
}
}
}
Expand All @@ -162,12 +179,24 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) {
RespondQueryTaskCompletedRequest queryCompletedRequest = new RespondQueryTaskCompletedRequest();
queryCompletedRequest.setTaskToken(decisionTask.getTaskToken());
Decider decider = null;
AtomicBoolean createdNew = new AtomicBoolean();
try {
decider =
stickyTaskListName == null
? createDecider(decisionTask)
: cache.getOrCreate(decisionTask, this::createDecider);
if (stickyTaskListName == null) {
decider = createDecider(decisionTask);
} else {
decider =
cache.getOrCreate(
decisionTask,
() -> {
createdNew.set(true);
return createDecider(decisionTask);
});
}

byte[] queryResult = decider.query(decisionTask, decisionTask.getQuery());
if (stickyTaskListName != null && createdNew.get()) {
cache.addToCache(decisionTask, decider);
}
queryCompletedRequest.setQueryResult(queryResult);
queryCompletedRequest.setCompletedType(QueryTaskCompletedType.COMPLETED);
} catch (Throwable e) {
Expand All @@ -180,6 +209,8 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) {
} finally {
if (stickyTaskListName == null && decider != null) {
decider.close();
} else {
cache.markProcessingDone(decisionTask);
}
}
return new Result(null, null, queryCompletedRequest, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,31 +236,27 @@ public void start() {
.gauge(MetricsType.WORKFLOW_ACTIVE_THREAD_COUNT)
.update(((ThreadPoolExecutor) threadPool).getActiveCount());

try {
taskFuture = threadPool.submit(task);
return;
} catch (RejectedExecutionException e) {
getDecisionContext()
.getMetricsScope()
.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION)
.inc(1);
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have some time limit, just to be on the safe side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We break if there's no item in the cache that can be evicted. Seems to me it's safe to always retry if we have a successful eviction.

try {
taskFuture = threadPool.submit(task);
return;
} catch (RejectedExecutionException e) {
getDecisionContext()
.getMetricsScope()
.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION)
.inc(1);
if (cache != null) {
cache.evictAny(this.runner.getDecisionContext().getContext().getRunId());
boolean evicted =
cache.evictAnyNotInProcessing(
this.runner.getDecisionContext().getContext().getRunId());
if (!evicted) {
throw e;
}
} else {
throw e;
}
} catch (InterruptedException e1) {
log.warn("Unable to evict cache", e1);
}
}

try {
taskFuture = threadPool.submit(task);
} catch (RejectedExecutionException e) {
throw new Error(
"Not enough threads to execute workflows. "
+ "If this message appears consistently either WorkerOptions.maxConcurrentWorkflowExecutionSize "
+ "should be decreased or WorkerOptions.maxWorkflowThreads increased.");
}
}

public WorkflowThreadContext getContext() {
Expand Down
Loading