From 3d119a115e707e80b4b14174edd7de5048b732e9 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 2 Jun 2017 15:48:54 +0200 Subject: [PATCH 1/2] [FLINK-6833] [task] Fail StreamTask only due to async exception if it is running In order to resolve a race condition between a properly terminated StreamTask which cleans up its resources (stopping asynchronous operations, etc.) and a cancelled asynchronous operation (e.g. asynchronous checkpointing operation), we check whether the StreamTask is still running before failing it externally. --- .../streaming/runtime/tasks/StreamTask.java | 7 +- .../tasks/StreamTaskTerminationTest.java | 280 ++++++++++++++++++ 2 files changed, 285 insertions(+), 2 deletions(-) create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 4dd708f0c28a0..5fb5d2d27bf50 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -824,11 +824,14 @@ public ProcessingTimeService getProcessingTimeService() { * FAILED, and, if the invokable code is running, starts an asynchronous thread * that aborts that code. * - *

This method never blocks.

+ *

This method never blocks. */ @Override public void handleAsyncException(String message, Throwable exception) { - getEnvironment().failExternally(exception); + if (isRunning) { + // only fail if the task is still running + getEnvironment().failExternally(exception); + } } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java new file mode 100644 index 0000000000000..7fe45e582e14d --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.librarycache.FallbackLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.TaskInformation; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.taskmanager.CheckpointResponder; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.taskmanager.TaskManagerActions; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.net.URL; +import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.FutureTask; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class StreamTaskTerminationTest extends TestLogger { + + public static final OneShotLatch runLatch = new OneShotLatch(); + public static final OneShotLatch checkpointingLatch = new OneShotLatch(); + private static final OneShotLatch cleanupLatch = new OneShotLatch(); + private static final OneShotLatch handleAsyncExceptionLatch = new OneShotLatch(); + + /** + * FLINK-6833 + * + * Tests that a finished stream task cannot be failed by an asynchronous checkpointing operation after + * the stream task has stopped running. + */ + @Test + public void testConcurrentAsyncCheckpointCannotFailFinishedStreamTask() throws Exception { + final Configuration taskConfiguration = new Configuration(); + final StreamConfig streamConfig = new StreamConfig(taskConfiguration); + final NoOpStreamOperator noOpStreamOperator = new NoOpStreamOperator<>(); + + final AbstractStateBackend blockingStateBackend = new BlockingStateBackend(); + + streamConfig.setStreamOperator(noOpStreamOperator); + streamConfig.setStateBackend(blockingStateBackend); + + final long checkpointId = 0L; + final long checkpointTimestamp = 0L; + + final JobInformation jobInformation = new JobInformation( + new JobID(), + "Test Job", + new SerializedValue<>(new ExecutionConfig()), + new Configuration(), + Collections.emptyList(), + Collections.emptyList()); + + final TaskInformation taskInformation = new TaskInformation( + new JobVertexID(), + "Test Task", + 1, + 1, + BlockingStreamTask.class.getName(), + taskConfiguration); + + final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo(); + + final NetworkEnvironment networkEnv = mock(NetworkEnvironment.class); + when(networkEnv.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))).thenReturn(mock(TaskKvStateRegistry.class)); + + final Task task = new Task( + jobInformation, + taskInformation, + new ExecutionAttemptID(), + new AllocationID(), + 0, + 0, + Collections.emptyList(), + Collections.emptyList(), + 0, + null, + new MemoryManager(32L * 1024L, 1), + new IOManagerAsync(), + networkEnv, + mock(BroadcastVariableManager.class), + mock(TaskManagerActions.class), + mock(InputSplitProvider.class), + mock(CheckpointResponder.class), + new FallbackLibraryCacheManager(), + mock(FileCache.class), + taskManagerRuntimeInfo, + new UnregisteredTaskMetricsGroup(), + mock(ResultPartitionConsumableNotifier.class), + mock(PartitionProducerStateChecker.class), + Executors.directExecutor()); + + Future taskRun = FlinkCompletableFuture.supplyAsync(new Callable() { + @Override + public Void call() throws Exception { + task.run(); + + return null; + } + }, TestingUtils.defaultExecutor()); + + // wait until the stream task started running + runLatch.await(); + + // trigger a checkpoint + task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, CheckpointOptions.forFullCheckpoint()); + + // wait until the task has completed execution + taskRun.get(); + + // check that no failure occurred + if (task.getFailureCause() != null) { + throw new Exception("Task failed", task.getFailureCause()); + } + + // check that we have entered the finished state + assertEquals(ExecutionState.FINISHED, task.getExecutionState()); + } + + public static class BlockingStreamTask> extends StreamTask { + + public BlockingStreamTask() { + } + + @Override + protected void init() throws Exception { + + } + + @Override + protected void run() throws Exception { + runLatch.trigger(); + // wait until we have started an asynchronous checkpoint + checkpointingLatch.await(); + } + + + + @Override + protected void cleanup() throws Exception { + // notify the asynchronous checkpoint operation that we have reached the cleanup stage --> the task + // has been stopped + cleanupLatch.trigger(); + + // wait until handle async exception has been called to proceed with the termination of the + // StreamTask + handleAsyncExceptionLatch.await(); + } + + @Override + protected void cancelTask() throws Exception {} + + @Override + public void handleAsyncException(String message, Throwable exception) { + super.handleAsyncException(message, exception); + + handleAsyncExceptionLatch.trigger(); + } + } + + public static class NoOpStreamOperator extends AbstractStreamOperator { + private static final long serialVersionUID = 4517845269225218312L; + } + + public static class BlockingStateBackend extends AbstractStateBackend { + + private static final long serialVersionUID = -5053068148933314100L; + + @Override + public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException { + return mock(CheckpointStreamFactory.class); + } + + @Override + public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String operatorIdentifier, @Nullable String targetLocation) throws IOException { + return null; + } + + @Override + public AbstractKeyedStateBackend createKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry) throws IOException { + return null; + } + + @Override + public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception { + OperatorStateBackend operatorStateBackend = mock(OperatorStateBackend.class); + when(operatorStateBackend.snapshot(anyLong(), anyLong(), any(CheckpointStreamFactory.class), any(CheckpointOptions.class))) + .thenReturn(new FutureTask<>(new BlockingCallable())); + + return operatorStateBackend; + } + } + + public static class BlockingCallable implements Callable { + + @Override + public OperatorStateHandle call() throws Exception { + // notify that we have started the asynchronous checkpointint operation + checkpointingLatch.trigger(); + // wait until we have reached the StreamTask#cleanup --> This will already cancel this FutureTask + cleanupLatch.await(); + + // now throw exception to fail the async checkpointing operation if it has not already been cancelled + // by the StreamTask in the meantime + throw new FlinkException("Checkpointing operation failed"); + } + } +} From 5acb52785eccfc7daa2b775f0c1175d842f45a45 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 2 Jun 2017 16:20:14 +0200 Subject: [PATCH 2/2] Fix checkstyle violations --- .../tasks/StreamTaskTerminationTest.java | 60 +++++++++++-------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 7fe45e582e14d..f021b389c811e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -66,9 +66,11 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; + import org.junit.Test; import javax.annotation.Nullable; + import java.io.IOException; import java.net.URL; import java.util.Collections; @@ -81,17 +83,20 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +/** + * Tests for the StreamTask termination. + */ public class StreamTaskTerminationTest extends TestLogger { - public static final OneShotLatch runLatch = new OneShotLatch(); - public static final OneShotLatch checkpointingLatch = new OneShotLatch(); - private static final OneShotLatch cleanupLatch = new OneShotLatch(); - private static final OneShotLatch handleAsyncExceptionLatch = new OneShotLatch(); + public static final OneShotLatch RUN_LATCH = new OneShotLatch(); + public static final OneShotLatch CHECKPOINTING_LATCH = new OneShotLatch(); + private static final OneShotLatch CLEANUP_LATCH = new OneShotLatch(); + private static final OneShotLatch HANDLE_ASYNC_EXCEPTION_LATCH = new OneShotLatch(); /** * FLINK-6833 * - * Tests that a finished stream task cannot be failed by an asynchronous checkpointing operation after + *

Tests that a finished stream task cannot be failed by an asynchronous checkpointing operation after * the stream task has stopped running. */ @Test @@ -165,7 +170,7 @@ public Void call() throws Exception { }, TestingUtils.defaultExecutor()); // wait until the stream task started running - runLatch.await(); + RUN_LATCH.await(); // trigger a checkpoint task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, CheckpointOptions.forFullCheckpoint()); @@ -182,6 +187,10 @@ public Void call() throws Exception { assertEquals(ExecutionState.FINISHED, task.getExecutionState()); } + /** + * Blocking stream task which waits on and triggers a set of one shot latches to establish a certain + * interleaving with a concurrently running checkpoint operation. + */ public static class BlockingStreamTask> extends StreamTask { public BlockingStreamTask() { @@ -194,40 +203,39 @@ protected void init() throws Exception { @Override protected void run() throws Exception { - runLatch.trigger(); + RUN_LATCH.trigger(); // wait until we have started an asynchronous checkpoint - checkpointingLatch.await(); + CHECKPOINTING_LATCH.await(); } - - @Override protected void cleanup() throws Exception { // notify the asynchronous checkpoint operation that we have reached the cleanup stage --> the task // has been stopped - cleanupLatch.trigger(); + CLEANUP_LATCH.trigger(); // wait until handle async exception has been called to proceed with the termination of the // StreamTask - handleAsyncExceptionLatch.await(); + HANDLE_ASYNC_EXCEPTION_LATCH.await(); } @Override - protected void cancelTask() throws Exception {} + protected void cancelTask() throws Exception { + } @Override public void handleAsyncException(String message, Throwable exception) { super.handleAsyncException(message, exception); - handleAsyncExceptionLatch.trigger(); + HANDLE_ASYNC_EXCEPTION_LATCH.trigger(); } } - public static class NoOpStreamOperator extends AbstractStreamOperator { + static class NoOpStreamOperator extends AbstractStreamOperator { private static final long serialVersionUID = 4517845269225218312L; } - public static class BlockingStateBackend extends AbstractStateBackend { + static class BlockingStateBackend extends AbstractStateBackend { private static final long serialVersionUID = -5053068148933314100L; @@ -243,13 +251,13 @@ public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String @Override public AbstractKeyedStateBackend createKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws IOException { + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry) throws IOException { return null; } @@ -263,14 +271,14 @@ public OperatorStateBackend createOperatorStateBackend(Environment env, String o } } - public static class BlockingCallable implements Callable { + static class BlockingCallable implements Callable { @Override public OperatorStateHandle call() throws Exception { // notify that we have started the asynchronous checkpointint operation - checkpointingLatch.trigger(); + CHECKPOINTING_LATCH.trigger(); // wait until we have reached the StreamTask#cleanup --> This will already cancel this FutureTask - cleanupLatch.await(); + CLEANUP_LATCH.await(); // now throw exception to fail the async checkpointing operation if it has not already been cancelled // by the StreamTask in the meantime