diff --git a/src/main/java/com/uber/cadence/internal/replay/DeciderCache.java b/src/main/java/com/uber/cadence/internal/replay/DeciderCache.java index 7ce7a0590..4cbf0ee03 100644 --- a/src/main/java/com/uber/cadence/internal/replay/DeciderCache.java +++ b/src/main/java/com/uber/cadence/internal/replay/DeciderCache.java @@ -18,34 +18,22 @@ package com.uber.cadence.internal.replay; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.util.concurrent.ExecutionError; -import com.google.common.util.concurrent.UncheckedExecutionException; import com.uber.cadence.PollForDecisionTaskResponse; -import com.uber.cadence.internal.common.ThrowableFunc1; import com.uber.cadence.internal.metrics.MetricsType; import com.uber.m3.tally.Scope; -import java.util.Iterator; -import java.util.Objects; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.*; +import java.util.concurrent.Callable; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public final class DeciderCache { private final Scope metricsScope; private LoadingCache cache; - private Lock evictionLock = new ReentrantLock(); - Random rand = new Random(); - - private static final Logger log = LoggerFactory.getLogger(DeciderCache.class); + private Lock cacheLock = new ReentrantLock(); + private Set inProcessing = new HashSet<>(); public DeciderCache(int maxCacheSize, Scope scope) { Preconditions.checkArgument(maxCacheSize > 0, "Max cache size must be greater than 0"); @@ -70,83 +58,79 @@ public Decider load(String key) { } public Decider getOrCreate( - PollForDecisionTaskResponse decisionTask, - ThrowableFunc1 createReplayDecider) - throws Exception { + PollForDecisionTaskResponse decisionTask, Callable deciderFunc) throws Exception { String runId = decisionTask.getWorkflowExecution().getRunId(); - metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size()); if (isFullHistory(decisionTask)) { - invalidate(decisionTask); - return cache.get(runId, () -> createReplayDecider.apply(decisionTask)); + invalidate(runId); + return deciderFunc.call(); + } + + Decider decider = getForProcessing(runId); + if (decider != null) { + return decider; } - AtomicBoolean miss = new AtomicBoolean(); - Decider result = null; + return deciderFunc.call(); + } + + private Decider getForProcessing(String runId) throws Exception { + cacheLock.lock(); try { - result = - cache.get( - runId, - () -> { - miss.set(true); - return createReplayDecider.apply(decisionTask); - }); - } catch (UncheckedExecutionException | ExecutionError e) { - Throwables.throwIfUnchecked(e.getCause()); + Decider decider = cache.get(runId); + inProcessing.add(runId); + metricsScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1); + return decider; + } catch (CacheLoader.InvalidCacheLoadException e) { + // We don't have a default loader and don't want to have one. So it's ok to get null value. + metricsScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1); + return null; } finally { - if (miss.get()) { - metricsScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1); - } else { - metricsScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1); - } + cacheLock.unlock(); } - return result; } - public void evictAny(String runId) throws InterruptedException { - // Timeout is to guard against workflows trying to evict each other. - if (!evictionLock.tryLock(rand.nextInt(4), TimeUnit.SECONDS)) { - return; + void markProcessingDone(PollForDecisionTaskResponse decisionTask) { + String runId = decisionTask.getWorkflowExecution().getRunId(); + + cacheLock.lock(); + try { + inProcessing.remove(runId); + } finally { + cacheLock.unlock(); } + } + + public void addToCache(PollForDecisionTaskResponse decisionTask, Decider decider) { + String runId = decisionTask.getWorkflowExecution().getRunId(); + cache.put(runId, decider); + } + + public boolean evictAnyNotInProcessing(String runId) { + cacheLock.lock(); try { metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size()); - Set set = cache.asMap().keySet(); - if (set.isEmpty()) { - return; - } - Iterator iter = cache.asMap().keySet().iterator(); - String key = ""; - while (iter.hasNext()) { - key = iter.next(); - if (!key.equals(runId)) { - break; + for (String key : cache.asMap().keySet()) { + if (!key.equals(runId) && !inProcessing.contains(key)) { + cache.invalidate(key); + metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size()); + metricsScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1); + return true; } } - if (key.equals(runId)) { - log.warn(String.format("%s attempted to self evict. Ignoring eviction", runId)); - return; - } - cache.invalidate(key); - metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size()); - metricsScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1); + return false; } finally { - evictionLock.unlock(); + cacheLock.unlock(); } } - public void invalidate(PollForDecisionTaskResponse decisionTask) throws InterruptedException { - String runId = decisionTask.getWorkflowExecution().getRunId(); - invalidate(runId); - } - - private void invalidate(String runId) throws InterruptedException { - if (!evictionLock.tryLock(rand.nextInt(4), TimeUnit.SECONDS)) { - return; - } + void invalidate(String runId) { + cacheLock.lock(); try { cache.invalidate(runId); + inProcessing.remove(runId); metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1); } finally { - evictionLock.unlock(); + cacheLock.unlock(); } } @@ -163,11 +147,4 @@ private boolean isFullHistory(PollForDecisionTaskResponse decisionTask) { public void invalidateAll() { cache.invalidateAll(); } - - public static class EvictedException extends Exception { - - public EvictedException(String runId) { - super(String.format("cache was evicted for the decisionTask. RunId: %s", runId)); - } - } } diff --git a/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java b/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java index 55f36dec5..aad127f82 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java +++ b/src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java @@ -32,6 +32,7 @@ import java.time.Duration; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,12 +116,26 @@ private Result handleDecisionTaskImpl(PollForDecisionTaskResponse decisionTask) private Result processDecision(PollForDecisionTaskResponse decisionTask) throws Throwable { Decider decider = null; + AtomicBoolean createdNew = new AtomicBoolean(); try { - decider = - stickyTaskListName == null - ? createDecider(decisionTask) - : cache.getOrCreate(decisionTask, this::createDecider); + if (stickyTaskListName == null) { + decider = createDecider(decisionTask); + } else { + decider = + cache.getOrCreate( + decisionTask, + () -> { + createdNew.set(true); + return createDecider(decisionTask); + }); + } + List decisions = decider.decide(decisionTask); + + if (stickyTaskListName != null && createdNew.get()) { + cache.addToCache(decisionTask, decider); + } + if (log.isTraceEnabled()) { WorkflowExecution execution = decisionTask.getWorkflowExecution(); log.trace( @@ -148,12 +163,14 @@ private Result processDecision(PollForDecisionTaskResponse decisionTask) throws return createCompletedRequest(decisionTask, decisions); } catch (Throwable e) { if (stickyTaskListName != null) { - cache.invalidate(decisionTask); + cache.invalidate(decisionTask.getWorkflowExecution().getRunId()); } throw e; } finally { if (stickyTaskListName == null && decider != null) { decider.close(); + } else { + cache.markProcessingDone(decisionTask); } } } @@ -162,12 +179,24 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) { RespondQueryTaskCompletedRequest queryCompletedRequest = new RespondQueryTaskCompletedRequest(); queryCompletedRequest.setTaskToken(decisionTask.getTaskToken()); Decider decider = null; + AtomicBoolean createdNew = new AtomicBoolean(); try { - decider = - stickyTaskListName == null - ? createDecider(decisionTask) - : cache.getOrCreate(decisionTask, this::createDecider); + if (stickyTaskListName == null) { + decider = createDecider(decisionTask); + } else { + decider = + cache.getOrCreate( + decisionTask, + () -> { + createdNew.set(true); + return createDecider(decisionTask); + }); + } + byte[] queryResult = decider.query(decisionTask, decisionTask.getQuery()); + if (stickyTaskListName != null && createdNew.get()) { + cache.addToCache(decisionTask, decider); + } queryCompletedRequest.setQueryResult(queryResult); queryCompletedRequest.setCompletedType(QueryTaskCompletedType.COMPLETED); } catch (Throwable e) { @@ -180,6 +209,8 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) { } finally { if (stickyTaskListName == null && decider != null) { decider.close(); + } else { + cache.markProcessingDone(decisionTask); } } return new Result(null, null, queryCompletedRequest, null); diff --git a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java index 3e12faaf2..b268e6a42 100644 --- a/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java +++ b/src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java @@ -236,31 +236,27 @@ public void start() { .gauge(MetricsType.WORKFLOW_ACTIVE_THREAD_COUNT) .update(((ThreadPoolExecutor) threadPool).getActiveCount()); - try { - taskFuture = threadPool.submit(task); - return; - } catch (RejectedExecutionException e) { - getDecisionContext() - .getMetricsScope() - .counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION) - .inc(1); + while (true) { try { + taskFuture = threadPool.submit(task); + return; + } catch (RejectedExecutionException e) { + getDecisionContext() + .getMetricsScope() + .counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION) + .inc(1); if (cache != null) { - cache.evictAny(this.runner.getDecisionContext().getContext().getRunId()); + boolean evicted = + cache.evictAnyNotInProcessing( + this.runner.getDecisionContext().getContext().getRunId()); + if (!evicted) { + throw e; + } + } else { + throw e; } - } catch (InterruptedException e1) { - log.warn("Unable to evict cache", e1); } } - - try { - taskFuture = threadPool.submit(task); - } catch (RejectedExecutionException e) { - throw new Error( - "Not enough threads to execute workflows. " - + "If this message appears consistently either WorkerOptions.maxConcurrentWorkflowExecutionSize " - + "should be decreased or WorkerOptions.maxWorkflowThreads increased."); - } } public WorkflowThreadContext getContext() { diff --git a/src/test/java/com/uber/cadence/internal/replay/ReplayDeciderCacheTests.java b/src/test/java/com/uber/cadence/internal/replay/ReplayDeciderCacheTests.java index 19a4bb2be..a7f9adfd0 100644 --- a/src/test/java/com/uber/cadence/internal/replay/ReplayDeciderCacheTests.java +++ b/src/test/java/com/uber/cadence/internal/replay/ReplayDeciderCacheTests.java @@ -59,10 +59,13 @@ public void whenHistoryIsFullNewReplayDeciderIsReturnedAndCached_InitiallyEmpty( assertCacheIsEmpty(replayDeciderCache, runId); // Act - Decider decider = replayDeciderCache.getOrCreate(decisionTask, this::createFakeDecider); + Decider decider = + replayDeciderCache.getOrCreate(decisionTask, () -> createFakeDecider(decisionTask)); // Assert - assertNotEquals(decider, replayDeciderCache.getOrCreate(decisionTask, this::createFakeDecider)); + assertNotEquals( + decider, + replayDeciderCache.getOrCreate(decisionTask, () -> createFakeDecider(decisionTask))); } @Test @@ -75,20 +78,26 @@ public void whenHistoryIsFullNewReplayDeciderIsReturned_InitiallyCached() throws HistoryUtils.generateDecisionTaskWithInitialHistory( "domain", "taskList", "workflowType", service); - Decider decider = replayDeciderCache.getOrCreate(decisionTask1, this::createFakeDecider); + Decider decider = + replayDeciderCache.getOrCreate(decisionTask1, () -> createFakeDecider(decisionTask1)); + replayDeciderCache.addToCache(decisionTask1, decider); PollForDecisionTaskResponse decisionTask2 = HistoryUtils.generateDecisionTaskWithPartialHistoryFromExistingTask( decisionTask1, "domain", "stickyTaskList", service); assertEquals( - decider, replayDeciderCache.getOrCreate(decisionTask2, this::doNotCreateFakeDecider)); + decider, + replayDeciderCache.getOrCreate(decisionTask2, () -> doNotCreateFakeDecider(decisionTask2))); // Act - Decider decider2 = replayDeciderCache.getOrCreate(decisionTask2, this::createFakeDecider); + Decider decider2 = + replayDeciderCache.getOrCreate(decisionTask2, () -> createFakeDecider(decisionTask2)); // Assert - assertEquals(decider2, replayDeciderCache.getOrCreate(decisionTask2, this::createFakeDecider)); + assertEquals( + decider2, + replayDeciderCache.getOrCreate(decisionTask2, () -> createFakeDecider(decisionTask2))); assertSame(decider2, decider); service.close(); } @@ -112,13 +121,16 @@ public void whenHistoryIsPartialCachedEntryIsReturned() throws Exception { HistoryUtils.generateDecisionTaskWithInitialHistory( "domain", "taskList", "workflowType", service); - Decider decider = replayDeciderCache.getOrCreate(decisionTask, this::createFakeDecider); + Decider decider = + replayDeciderCache.getOrCreate(decisionTask, () -> createFakeDecider(decisionTask)); + replayDeciderCache.addToCache(decisionTask, decider); // Act PollForDecisionTaskResponse decisionTask2 = HistoryUtils.generateDecisionTaskWithPartialHistoryFromExistingTask( decisionTask, "domain", "stickyTaskList", service); - Decider decider2 = replayDeciderCache.getOrCreate(decisionTask2, this::doNotCreateFakeDecider); + Decider decider2 = + replayDeciderCache.getOrCreate(decisionTask2, () -> doNotCreateFakeDecider(decisionTask2)); // Assert // Wait for reporter @@ -146,7 +158,7 @@ public void whenHistoryIsPartialAndCacheIsEmptyThenExceptionIsThrown() throws Ex HistoryUtils.generateDecisionTaskWithPartialHistory(); try { - replayDeciderCache.getOrCreate(decisionTask, this::createFakeDecider); + replayDeciderCache.getOrCreate(decisionTask, () -> createFakeDecider(decisionTask)); } catch (IllegalArgumentException ex) { // Wait for reporter @@ -180,13 +192,17 @@ public void evictAnyWillInvalidateAnEntryRandomlyFromTheCache() throws Exception HistoryUtils.generateDecisionTaskWithInitialHistory(); // Act - replayDeciderCache.getOrCreate(decisionTask1, this::createFakeDecider); - replayDeciderCache.getOrCreate(decisionTask2, this::createFakeDecider); - replayDeciderCache.getOrCreate(decisionTask3, this::createFakeDecider); + Decider decider = + replayDeciderCache.getOrCreate(decisionTask1, () -> createFakeDecider(decisionTask1)); + replayDeciderCache.addToCache(decisionTask1, decider); + decider = replayDeciderCache.getOrCreate(decisionTask2, () -> createFakeDecider(decisionTask2)); + replayDeciderCache.addToCache(decisionTask2, decider); + decider = replayDeciderCache.getOrCreate(decisionTask3, () -> createFakeDecider(decisionTask3)); + replayDeciderCache.addToCache(decisionTask3, decider); assertEquals(3, replayDeciderCache.size()); - replayDeciderCache.evictAny(decisionTask3.workflowExecution.runId); + replayDeciderCache.evictAnyNotInProcessing(decisionTask3.workflowExecution.runId); // Assert assertEquals(2, replayDeciderCache.size()); @@ -205,11 +221,13 @@ public void evictAnyWillNotInvalidateItself() throws Exception { HistoryUtils.generateDecisionTaskWithInitialHistory(); // Act - replayDeciderCache.getOrCreate(decisionTask1, this::createFakeDecider); + Decider decider = + replayDeciderCache.getOrCreate(decisionTask1, () -> createFakeDecider(decisionTask1)); + replayDeciderCache.addToCache(decisionTask1, decider); assertEquals(1, replayDeciderCache.size()); - replayDeciderCache.evictAny(decisionTask1.workflowExecution.runId); + replayDeciderCache.evictAnyNotInProcessing(decisionTask1.workflowExecution.runId); // Assert assertEquals(1, replayDeciderCache.size()); @@ -221,14 +239,15 @@ private void assertCacheIsEmpty(DeciderCache cache, String runId) throws Excepti PollForDecisionTaskResponse decisionTask = new PollForDecisionTaskResponse() .setWorkflowExecution(new WorkflowExecution().setRunId(runId)); - cache.getOrCreate(decisionTask, this::doNotCreateFakeDecider); + cache.getOrCreate(decisionTask, () -> doNotCreateFakeDecider(decisionTask)); } catch (AssertionError e) { ex = e; } TestCase.assertNotNull(ex); } - private ReplayDecider doNotCreateFakeDecider(PollForDecisionTaskResponse response) { + private ReplayDecider doNotCreateFakeDecider( + @SuppressWarnings("unused") PollForDecisionTaskResponse response) { fail("should not be called"); return null; } diff --git a/src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java b/src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java index f6ccca8de..eab92a0d8 100644 --- a/src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java +++ b/src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java @@ -686,7 +686,8 @@ public void workflowThreadsWillEvictCacheWhenMaxThreadCountIsHit() throws Throwa Decider decider = new DetermisiticRunnerContainerDecider(d); PollForDecisionTaskResponse response = HistoryUtils.generateDecisionTaskWithInitialHistory(); - cache.getOrCreate(response, t -> decider); + cache.getOrCreate(response, () -> decider); + cache.addToCache(response, decider); d.runUntilAllBlocked(); assertEquals(2, threadPool.getActiveCount()); @@ -752,7 +753,8 @@ public void workflowThreadsWillNotEvictCacheWhenMaxThreadCountIsHit() throws Thr Decider decider = new DetermisiticRunnerContainerDecider(d); PollForDecisionTaskResponse response = HistoryUtils.generateDecisionTaskWithInitialHistory(); - cache.getOrCreate(response, t -> decider); + cache.getOrCreate(response, () -> decider); + cache.addToCache(response, decider); d.runUntilAllBlocked(); assertEquals(2, threadPool.getActiveCount()); diff --git a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java index 0cadef3d1..1e1bc963f 100644 --- a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java +++ b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java @@ -162,6 +162,64 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedSignals() throws Exception wrapper.close(); } + @Test + public void workflowCacheEvictionDueToThreads() { + // Arrange + String taskListName = "workflowCacheEvictionDueToThreads"; + + StatsReporter reporter = mock(StatsReporter.class); + Scope scope = + new RootScopeBuilder() + .reporter(reporter) + .reportEvery(com.uber.m3.util.Duration.ofMillis(300)); + + TestEnvironmentWrapper wrapper = + new TestEnvironmentWrapper( + new Worker.FactoryOptions.Builder() + .setDisableStickyExecution(false) + .setMetricScope(scope) + .setMaxWorkflowThreadCount(10) + .setCacheMaximumSize(100) + .build()); + Worker.Factory factory = wrapper.getWorkerFactory(); + Worker worker = + factory.newWorker( + taskListName, + new WorkerOptions.Builder().setMaxConcurrentWorkflowExecutionSize(5).build()); + worker.registerWorkflowImplementationTypes(ActivitiesWorkflowImpl.class); + worker.registerActivitiesImplementations(new ActivitiesImpl()); + factory.start(); + + WorkflowOptions workflowOptions = + new WorkflowOptions.Builder() + .setTaskList(taskListName) + .setExecutionStartToCloseTimeout(Duration.ofDays(30)) + .setTaskStartToCloseTimeout(Duration.ofSeconds(1)) + .build(); + + int count = 100; + ActivitiesWorkflow[] workflows = new ActivitiesWorkflow[count]; + WorkflowParams w = new WorkflowParams(); + w.CadenceSleep = Duration.ofSeconds(1); + w.ChainSequence = 2; + w.ConcurrentCount = 1; + w.PayloadSizeBytes = 10; + w.TaskListName = taskListName; + for (int i = 0; i < count; i++) { + ActivitiesWorkflow workflow = + wrapper.getWorkflowClient().newWorkflowStub(ActivitiesWorkflow.class, workflowOptions); + workflows[i] = workflow; + WorkflowClient.start(workflow::execute, w); + } + + for (int i = 0; i < count; i++) { + workflows[i].execute(w); + } + + // Finish Workflow + wrapper.close(); + } + @Test public void whenStickyIsEnabledThenTheWorkflowIsCachedActivities() throws Exception { // Arrange