Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,11 @@ public void run() {
}
}
};
computationState.activateWork(workItem.getKey(), work);
if (!computationState.activateWork(workItem.getKey(), work)) {
// Free worker if the work was not activated.
// This can happen if it's duplicate work or some other reason.
sdkHarnessRegistry.completeWork(worker);
}
}

abstract static class Work implements Runnable {
Expand Down Expand Up @@ -1998,7 +2002,7 @@ public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue(
}

/** Mark the given key and work as active. */
public void activateWork(ByteString key, Work work) {
public boolean activateWork(ByteString key, Work work) {
synchronized (activeWork) {
Queue<Work> queue = activeWork.get(key);
if (queue == null) {
Expand All @@ -2008,12 +2012,16 @@ public void activateWork(ByteString key, Work work) {
// Fall through to execute without the lock held.
} else {
if (queue.peek().getWorkItem().getWorkToken() != work.getWorkItem().getWorkToken()) {
// Queue the work for later processing.
queue.add(work);
return true;
}
return;
// Skip the work if duplicate
return false;
}
}
executor.execute(work);
return true;
}

/** Marks the work for a the given key as complete. Schedules queued work for the key if any. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2093,22 +2093,22 @@ public void testActiveWork() throws Exception {
ByteString key2 = ByteString.copyFromUtf8("key2");

MockWork m1 = new MockWork(1);
computationState.activateWork(key1, m1);
assertTrue(computationState.activateWork(key1, m1));
Mockito.verify(mockExecutor).execute(m1);
computationState.completeWork(key1, 1);
Mockito.verifyNoMoreInteractions(mockExecutor);

// Verify work queues.
MockWork m2 = new MockWork(2);
computationState.activateWork(key1, m2);
assertTrue(computationState.activateWork(key1, m2));
Mockito.verify(mockExecutor).execute(m2);
MockWork m3 = new MockWork(3);
computationState.activateWork(key1, m3);
assertTrue(computationState.activateWork(key1, m3));
Mockito.verifyNoMoreInteractions(mockExecutor);

// Verify another key is a separate queue.
MockWork m4 = new MockWork(4);
computationState.activateWork(key2, m4);
assertTrue(computationState.activateWork(key2, m4));
Mockito.verify(mockExecutor).execute(m4);
computationState.completeWork(key2, 4);
Mockito.verifyNoMoreInteractions(mockExecutor);
Expand All @@ -2118,9 +2118,12 @@ public void testActiveWork() throws Exception {
computationState.completeWork(key1, 3);
Mockito.verifyNoMoreInteractions(mockExecutor);

// Verify duplicate work dropped.
MockWork m5 = new MockWork(5);
computationState.activateWork(key1, m5);
Mockito.verify(mockExecutor).execute(m5);
assertFalse(computationState.activateWork(key1, m5));
Mockito.verifyNoMoreInteractions(mockExecutor);
computationState.completeWork(key1, 5);
Mockito.verifyNoMoreInteractions(mockExecutor);
}
Expand Down