Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2510: Incorrect shutdown status due to race between runloop and process callback thread #1344

Merged
merged 6 commits into from
Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions samza-core/src/main/java/org/apache/samza/container/RunLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,7 @@ public void run() {

long prevNs = clock.nanoTime();

while (!shutdownNow) {
if (throwable != null) {
log.error("Caught throwable and stopping run loop", throwable);
throw new SamzaException(throwable);
}

while (!shutdownNow && throwable == null) {
long startNs = clock.nanoTime();

IncomingMessageEnvelope envelope = chooseEnvelope();
Expand All @@ -185,6 +180,17 @@ public void run() {
containerMetrics.utilization().set(((double) activeNs) / totalNs);
}
}

/*
* The current semantics of external shutdown request (RunLoop.shutdown()) is loosely defined and run loop doesn't
* wait for inflight messages to complete and triggers shutdown as soon as it notices the shutdown request.
* Hence, it is possible that the exception may or may not propagated based on order of execution
* between process callback and run loop thread.
*/
if (throwable != null) {
mynameborat marked this conversation as resolved.
Show resolved Hide resolved
log.error("Caught throwable and stopping run loop", throwable);
throw new SamzaException(throwable);
}
} finally {
workerTimer.shutdown();
callbackExecutor.shutdown();
Expand Down Expand Up @@ -648,8 +654,10 @@ public void run() {
@Override
public void onFailure(TaskCallback callback, Throwable t) {
try {
state.doneProcess();
// set the exception code ahead of marking the message as processed to make sure the exception
// is visible to the run loop thread promptly. Refer SAMZA-2510 for more details.
abort(t);
state.doneProcess();
// update pending count, but not offset
TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
log.error("Got callback failure for task {}", callbackImpl.getTaskName(), t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.samza.container;

import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.OffsetManager;
import org.apache.samza.context.ContainerContext;
Expand Down Expand Up @@ -780,4 +782,28 @@ public void testProcessBehaviourWhenAsyncCommitIsEnabled() throws InterruptedExc

commitLatch.await();
}

@Test(expected = SamzaException.class)
public void testExceptionIsPropagated() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
OffsetManager offsetManager = mock(OffsetManager.class);

TestTask task0 = new TestTask(false, false, false, null);
TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer);

Map<TaskName, TaskInstance> tasks = ImmutableMap.of(taskName0, t0);

int maxMessagesInFlight = 2;
RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
() -> 0L, false);

when(consumerMultiplexer.choose(false))
.thenReturn(envelope0)
.thenReturn(ssp0EndOfStream)
mynameborat marked this conversation as resolved.
Show resolved Hide resolved
.thenReturn(null);

runLoop.run();
}
}