From beed1d499b8c876330fac73da324235dd69a4e91 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Fri, 14 Aug 2015 23:32:35 +0200 Subject: [PATCH 1/2] [FLINK-2462] [streaming] Major cleanup of operator structure for exception handling and code simplication - The exceptions are no longer logged by the operators themselves. Operators perform only cleanup in reaction to exceptions. Exceptions are reported only the the root Task object, which knows whether this is the first failure-causing exception (root cause), or is a subsequent exception, or whether the task was actually canceled already. In the later case, exceptions are ignored, because many cancellations lead to meaningless exceptions. - more exception in signatures, less wrapping where not needed - Core resource acquisition/release logic is in one streaming task, reducing code duplication - Guaranteed cleanup of output buffer and input buffer resources (formerly missed when other exceptions where encountered) - Fix mixup in instantiation of source contexts in the stream source task - Auto watermark generators correctly shut down their interval scheduler - Improve use of generics, got rid of many raw types This closes #1017 --- .../io/network/api/writer/RecordWriter.java | 16 +- .../jobgraph/tasks/AbstractInvokable.java | 2 +- .../tasks/CheckpointNotificationOperator.java | 14 +- .../runtime/operators/RegularPactTask.java | 27 +- .../operators/testutils/MockEnvironment.java | 9 +- .../operators/testutils/TaskTestBase.java | 39 ++- .../flink/streaming/api/graph/StreamEdge.java | 8 +- .../operators/AbstractUdfStreamOperator.java | 4 +- .../flink/streaming/api/operators/Output.java | 7 +- .../streaming/api/operators/StreamSource.java | 41 +-- .../runtime/io/BlockingQueueBroker.java | 22 +- .../runtime/io/RecordWriterFactory.java | 52 --- .../runtime/io/RecordWriterOutput.java | 64 ++-- .../runtime/io/StreamInputProcessor.java | 21 +- .../runtime/io/StreamRecordWriter.java | 28 +- .../runtime/io/StreamTwoInputProcessor.java | 9 +- .../streaming/runtime/io/StreamingReader.java | 7 +- .../runtime/tasks/OneInputStreamTask.java | 99 ++---- .../runtime/tasks/OutputHandler.java | 74 +++-- .../runtime/tasks/SourceStreamTask.java | 74 ++--- .../runtime/tasks/StreamIterationHead.java | 133 ++++---- .../runtime/tasks/StreamIterationTail.java | 79 ++--- .../streaming/runtime/tasks/StreamTask.java | 298 ++++++++++++------ .../tasks/StreamingRuntimeContext.java | 44 +-- .../runtime/tasks/TwoInputStreamTask.java | 132 +++----- .../api/state/StatefulOperatorTest.java | 21 +- .../runtime/io/StreamRecordWriterTest.java | 7 +- .../runtime/tasks/OneInputStreamTaskTest.java | 3 - .../tasks/OneInputStreamTaskTestHarness.java | 1 - .../runtime/tasks/StreamTaskTestHarness.java | 56 ++-- .../flink/streaming/util/MockContext.java | 8 +- .../OneInputStreamOperatorTestHarness.java | 4 +- .../streaming/util/SourceFunctionUtil.java | 9 +- .../StreamingMultipleProgramsTestBase.java | 2 +- .../TwoInputStreamOperatorTestHarness.java | 6 +- 35 files changed, 692 insertions(+), 728 deletions(-) delete mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 2ae61ed60ed65..17a6a18993da3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -187,15 +187,13 @@ public void flush() throws IOException { } public void clearBuffers() { - if (serializers != null) { - for (RecordSerializer s : serializers) { - synchronized (s) { - Buffer b = s.getCurrentBuffer(); - s.clear(); - - if (b != null) { - b.recycle(); - } + for (RecordSerializer s : serializers) { + synchronized (s) { + Buffer b = s.getCurrentBuffer(); + s.clear(); + + if (b != null) { + b.recycle(); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java index df41672dce3ff..0e0bd268a9944 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java @@ -49,7 +49,7 @@ public abstract class AbstractInvokable { /** * Must be overwritten by the concrete task to instantiate the required record reader and record writer. */ - public abstract void registerInputOutput(); + public abstract void registerInputOutput() throws Exception; /** * Must be overwritten by the concrete task. This method is called by the task manager diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java index 90c82b717cd27..0eb9e0789ffa6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java @@ -18,8 +18,18 @@ package org.apache.flink.runtime.jobgraph.tasks; - +/** + * This interface needs to be implemented by runtime tasks that want to be able to receive + * notifications about completed checkpoints. + */ public interface CheckpointNotificationOperator { - + + /** + * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received + * the notification from all participating tasks. + * + * @param checkpointId The ID of the checkpoint that is complete.. + * @throws Exception The notification method may forward its exceptions. + */ void notifyCheckpointComplete(long checkpointId) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index 873d9486925eb..3cefbba67a000 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -226,7 +226,7 @@ public class RegularPactTask extends AbstractInvokable i * and as a setup method on the TaskManager. */ @Override - public void registerInputOutput() { + public void registerInputOutput() throws Exception { if (LOG.isDebugEnabled()) { LOG.debug(formatLogString("Start registering input and output.")); } @@ -239,26 +239,13 @@ public void registerInputOutput() { final Class> driverClass = this.config.getDriver(); this.driver = InstantiationUtil.instantiate(driverClass, PactDriver.class); - // initialize the readers. this is necessary for nephele to create the input gates - // however, this does not trigger any local processing. - try { - initInputReaders(); - initBroadcastInputReaders(); - } catch (Exception e) { - throw new RuntimeException("Initializing the input streams failed in Task " + getEnvironment().getTaskName() + - (e.getMessage() == null ? "." : ": " + e.getMessage()), e); - } + // initialize the readers. + // this does not yet trigger any stream consuming or processing. + initInputReaders(); + initBroadcastInputReaders(); - // initialize the writers. this is necessary for nephele to create the output gates. - // because in the presence of chained tasks, the tasks writers depend on the last task in the chain, - // we need to initialize the chained tasks as well. the chained tasks are only set up, but no work - // (such as setting up a sorter, etc.) starts - try { - initOutputs(); - } catch (Exception e) { - throw new RuntimeException("Initializing the output handlers failed" + - (e.getMessage() == null ? "." : ": " + e.getMessage()), e); - } + // initialize the writers. + initOutputs(); if (LOG.isDebugEnabled()) { LOG.debug(formatLogString("Finished registering input and output.")); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 51c7f93c6244b..5fc9bb9f081f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -63,6 +63,8 @@ public class MockEnvironment implements Environment { + private final String taskName; + private final MemoryManager memManager; private final IOManager ioManager; @@ -85,7 +87,8 @@ public class MockEnvironment implements Environment { private final int bufferSize; - public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { + public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { + this.taskName = taskName; this.jobConfiguration = new Configuration(); this.taskConfiguration = new Configuration(); this.inputs = new LinkedList(); @@ -214,12 +217,12 @@ public InputSplitProvider getInputSplitProvider() { @Override public String getTaskName() { - return null; + return taskName; } @Override public String getTaskNameWithSubtasks() { - return null; + return taskName + "(0/1)"; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java index 6ffc97b60d51e..c7be7a5b894ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java @@ -52,13 +52,11 @@ public abstract class TaskTestBase { public void initEnvironment(long memorySize, int bufferSize) { this.memorySize = memorySize; this.inputSplitProvider = new MockInputSplitProvider(); - this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider, bufferSize); + this.mockEnv = new MockEnvironment("mock task", this.memorySize, this.inputSplitProvider, bufferSize); } public IteratorWrappingTestSingleInputGate addInput(MutableObjectIterator input, int groupId) { - final IteratorWrappingTestSingleInputGate reader = addInput(input, groupId, true); - - return reader; + return addInput(input, groupId, true); } public IteratorWrappingTestSingleInputGate addInput(MutableObjectIterator input, int groupId, boolean read) { @@ -89,19 +87,32 @@ public Configuration getConfiguration() { return this.mockEnv.getTaskConfiguration(); } - public void registerTask(AbstractInvokable task, @SuppressWarnings("rawtypes") Class driver, Class stubClass) { + public void registerTask(AbstractInvokable task, + @SuppressWarnings("rawtypes") Class driver, + Class stubClass) { + final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration()); config.setDriver(driver); config.setStubWrapper(new UserCodeClassWrapper(stubClass)); task.setEnvironment(this.mockEnv); - task.registerInputOutput(); + try { + task.registerInputOutput(); + } + catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } } public void registerTask(AbstractInvokable task) { task.setEnvironment(this.mockEnv); - task.registerInputOutput(); + try { + task.registerInputOutput(); + } + catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } } public void registerFileOutputTask(AbstractInvokable outTask, Class stubClass, String outPath) { @@ -118,7 +129,12 @@ public void registerFileOutputTask(AbstractInvokable outTask, FileOutputFormat o outTask.setEnvironment(this.mockEnv); - outTask.registerInputOutput(); + try { + outTask.registerInputOutput(); + } + catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } } public void registerFileInputTask(AbstractInvokable inTask, @@ -142,7 +158,12 @@ public void registerFileInputTask(AbstractInvokable inTask, inTask.setEnvironment(this.mockEnv); - inTask.registerInputOutput(); + try { + inTask.registerInputOutput(); + } + catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } } public MemoryManager getMemoryManager() { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java index 47d97df6500d1..c252870c16d3a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java @@ -45,7 +45,7 @@ public class StreamEdge implements Serializable { * A list of output names that the target vertex listens to (if there is * output selection). */ - final private List selectedNames; + private final List selectedNames; private StreamPartitioner outputPartitioner; public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber, @@ -108,11 +108,7 @@ public boolean equals(Object o) { StreamEdge that = (StreamEdge) o; - if (!edgeId.equals(that.edgeId)) { - return false; - } - - return true; + return edgeId.equals(that.edgeId); } @Override diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 585b4cece839d..438d529413888 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -98,7 +98,7 @@ public void restoreInitialState(Tuple2, Map, Map> getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception { // Get all the states for the operator - Map operatorStates = runtimeContext.getOperatorStates(); + Map> operatorStates = runtimeContext.getOperatorStates(); Map operatorStateSnapshots; if (operatorStates.isEmpty()) { @@ -108,7 +108,7 @@ public Tuple2, Map> getSt // Checkpoint the states and store the handles in a map Map snapshots = new HashMap(); - for (Entry state : operatorStates.entrySet()) { + for (Entry> state : operatorStates.entrySet()) { boolean isPartitioned = state.getValue() instanceof PartitionedStreamOperatorState; snapshots.put(state.getKey(), new OperatorStateHandle(state.getValue().snapshotState(checkpointId, timestamp), diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java index 89d5560691b53..b684326044827 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java @@ -25,7 +25,7 @@ * of this interface that can be used to emit elements and other messages, such as barriers * and watermarks, from an operator. * - * @param The type of the elments that can be emitted. + * @param The type of the elements that can be emitted. */ public interface Output extends Collector { @@ -33,9 +33,8 @@ public interface Output extends Collector { * Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream * operators. * - *

- * A watermark specifies that no element with a timestamp older or equal to the watermark - * timestamp will be emitted in the future. + *

A watermark specifies that no element with a timestamp older or equal to the watermark + * timestamp will be emitted in the future.

*/ void emitWatermark(Watermark mark); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index 0cc46f549b236..7fad2957167dd 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -41,24 +41,23 @@ public StreamSource(SourceFunction sourceFunction) { this.chainingStrategy = ChainingStrategy.HEAD; } - public void run(final Object lockingObject, final Output> collector) throws Exception { + public void run(Object lockingObject, Output> collector) throws Exception { - SourceFunction.SourceContext ctx = null; + SourceFunction.SourceContext ctx; if (userFunction instanceof EventTimeSourceFunction) { ctx = new ManualWatermarkContext(lockingObject, collector); } else if (executionConfig.getAutoWatermarkInterval() > 0) { ctx = new AutomaticWatermarkContext(lockingObject, collector, executionConfig); } else if (executionConfig.areTimestampsEnabled()) { - ctx = new NonTimestampContext(lockingObject, collector); - } else { ctx = new NonWatermarkContext(lockingObject, collector); + } else { + ctx = new NonTimestampContext(lockingObject, collector); } userFunction.run(ctx); } public void cancel() { - userFunction.cancel(); } @@ -69,10 +68,9 @@ public void cancel() { */ public static class NonTimestampContext implements SourceFunction.SourceContext { - private final Object lockingObject; private final Output> output; - StreamRecord reuse; + private final StreamRecord reuse; public NonTimestampContext(Object lockingObjectParam, Output> outputParam) { this.lockingObject = lockingObjectParam; @@ -105,8 +103,7 @@ public Object getCheckpointLock() { } @Override - public void close() { - } + public void close() {} } /** @@ -114,10 +111,9 @@ public void close() { */ public static class NonWatermarkContext implements SourceFunction.SourceContext { - private final Object lockingObject; private final Output> output; - StreamRecord reuse; + private final StreamRecord reuse; public NonWatermarkContext(Object lockingObjectParam, Output> outputParam) { this.lockingObject = lockingObjectParam; @@ -151,8 +147,7 @@ public Object getCheckpointLock() { } @Override - public void close() { - } + public void close() {} } /** @@ -161,12 +156,13 @@ public void close() { */ public static class AutomaticWatermarkContext implements SourceFunction.SourceContext { - private transient ScheduledFuture watermarkTimer = null; + private final ScheduledExecutorService scheduleExecutor; + private final ScheduledFuture watermarkTimer; private final long watermarkInterval; private final Object lockingObject; private final Output> output; - StreamRecord reuse; + private final StreamRecord reuse; private volatile long lastWatermarkTime; @@ -179,9 +175,9 @@ public AutomaticWatermarkContext(Object lockingObjectParam, watermarkInterval = executionConfig.getAutoWatermarkInterval(); - ScheduledExecutorService service = Executors.newScheduledThreadPool(2); + scheduleExecutor = Executors.newScheduledThreadPool(1); - watermarkTimer = service.scheduleAtFixedRate(new Runnable() { + watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { long currentTime = System.currentTimeMillis(); @@ -237,9 +233,8 @@ public Object getCheckpointLock() { @Override public void close() { - if (watermarkTimer != null && !watermarkTimer.isDone()) { - watermarkTimer.cancel(true); - } + watermarkTimer.cancel(true); + scheduleExecutor.shutdownNow(); } } @@ -251,7 +246,7 @@ public static class ManualWatermarkContext implements SourceFunction.SourceCo private final Object lockingObject; private final Output> output; - StreamRecord reuse; + private final StreamRecord reuse; public ManualWatermarkContext(Object lockingObject, Output> output) { this.lockingObject = lockingObject; @@ -283,8 +278,6 @@ public Object getCheckpointLock() { } @Override - public void close() { - - } + public void close() {} } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java index 9bf4eb488d908..be3c9afab01a3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java @@ -20,22 +20,12 @@ import java.util.concurrent.BlockingQueue; import org.apache.flink.runtime.iterative.concurrent.Broker; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -@SuppressWarnings("rawtypes") -public class BlockingQueueBroker extends Broker> { - /** - * Singleton instance - */ - private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker(); +public class BlockingQueueBroker extends Broker> { + + /** Singleton instance */ + public static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker(); - private BlockingQueueBroker() { - } - - /** - * retrieve singleton instance - */ - public static Broker> instance() { - return INSTANCE; - } + /** Cannot instantiate */ + private BlockingQueueBroker() {} } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java deleted file mode 100644 index 3a7ba3edd91b8..0000000000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.api.writer.ChannelSelector; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class RecordWriterFactory { - private static final Logger LOG = LoggerFactory.getLogger(RecordWriterFactory.class); - - public static RecordWriter createRecordWriter(ResultPartitionWriter bufferWriter, ChannelSelector channelSelector, long bufferTimeout) { - - RecordWriter output; - - if (bufferTimeout >= 0) { - output = new StreamRecordWriter(bufferWriter, channelSelector, bufferTimeout); - - if (LOG.isTraceEnabled()) { - LOG.trace("StreamRecordWriter initiated with {} bufferTimeout.", bufferTimeout); - } - } else { - output = new RecordWriter(bufferWriter, channelSelector); - - if (LOG.isTraceEnabled()) { - LOG.trace("RecordWriter initiated."); - } - } - - return output; - - } - -} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index f0f18b1a2dc26..7048464669445 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -19,7 +19,6 @@ import java.io.IOException; -import com.google.common.base.Preconditions; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; @@ -29,35 +28,38 @@ import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static com.google.common.base.Preconditions.checkNotNull; /** * Implementation of {@link Output} that sends data using a {@link RecordWriter}. */ public class RecordWriterOutput implements Output> { - private static final Logger LOG = LoggerFactory.getLogger(RecordWriterOutput.class); - - private RecordWriter> recordWriter; + private StreamRecordWriter> recordWriter; private SerializationDelegate serializationDelegate; + @SuppressWarnings("unchecked") public RecordWriterOutput( - RecordWriter recordWriter, + StreamRecordWriter>> recordWriter, TypeSerializer outSerializer, boolean enableWatermarkMultiplexing) { - - Preconditions.checkNotNull(recordWriter); - this.recordWriter = (RecordWriter>) recordWriter; + checkNotNull(recordWriter); + + // generic hack: cast the writer to generic Object type so we can use it + // with multiplexed records and watermarks + this.recordWriter = (StreamRecordWriter>) + (StreamRecordWriter) recordWriter; TypeSerializer outRecordSerializer; if (enableWatermarkMultiplexing) { outRecordSerializer = new MultiplexingStreamRecordSerializer(outSerializer); } else { - outRecordSerializer = (TypeSerializer) (TypeSerializer) new StreamRecordSerializer(outSerializer); + outRecordSerializer = (TypeSerializer) + (TypeSerializer) new StreamRecordSerializer(outSerializer); } if (outSerializer != null) { @@ -71,11 +73,9 @@ public void collect(StreamRecord record) { try { recordWriter.emit(serializationDelegate); - } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Emit failed: {}", e); - } - throw new RuntimeException("Element emission failed.", e); + } + catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); } } @@ -85,33 +85,27 @@ public void emitWatermark(Watermark mark) { try { recordWriter.broadcastEmit(serializationDelegate); - } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Watermark emit failed: {}", e); - } - throw new RuntimeException(e); + } + catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); } } + public void broadcastEvent(AbstractEvent barrier) throws IOException, InterruptedException { + recordWriter.broadcastEvent(barrier); + } + + + public void flush() throws IOException { + recordWriter.flush(); + } + @Override public void close() { - try { - if (recordWriter instanceof StreamRecordWriter) { - ((StreamRecordWriter) recordWriter).close(); - } else { - recordWriter.flush(); - } - } - catch (IOException e) { - throw new RuntimeException("Failed to flush final output", e); - } + recordWriter.close(); } public void clearBuffers() { recordWriter.clearBuffers(); } - - public void broadcastEvent(AbstractEvent barrier) throws IOException, InterruptedException { - recordWriter.broadcastEvent(barrier); - } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 4ad5b45985304..de021ff4f31bf 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.reader.AbstractReader; -import org.apache.flink.runtime.io.network.api.reader.ReaderBase; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; @@ -44,9 +43,6 @@ import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}. * @@ -55,11 +51,8 @@ * * @param The type of the record that can be read with this record reader. */ -public class StreamInputProcessor extends AbstractReader implements ReaderBase, StreamingReader { - - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class); - +public class StreamInputProcessor extends AbstractReader implements StreamingReader { + private final RecordDeserializer>[] recordDeserializers; private RecordDeserializer> currentRecordDeserializer; @@ -203,17 +196,17 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { } } - public void clearBuffers() { + @Override + public void cleanup() throws IOException { + // clear the buffers first. this part should not ever fail for (RecordDeserializer deserializer : recordDeserializers) { Buffer buffer = deserializer.getCurrentBuffer(); if (buffer != null && !buffer.isRecycled()) { buffer.recycle(); } } - } - - @Override - public void cleanup() throws IOException { + + // cleanup the barrier handler resources barrierHandler.cleanup(); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java index 321f3b4ec1fd8..8dcaad8a03be5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java @@ -58,15 +58,18 @@ public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector chann super(writer, channelSelector); - checkArgument(timeout >= 0); + checkArgument(timeout >= -1); - if (timeout == 0) { + if (timeout == -1) { + flushAlways = false; + outputFlusher = null; + } + else if (timeout == 0) { flushAlways = true; outputFlusher = null; } else { flushAlways = false; - String threadName = taskName == null ? DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output Timeout Flusher - " + taskName; @@ -94,34 +97,27 @@ public void broadcastEmit(T record) throws IOException, InterruptedException { } /** - * Closes the writer. This stops the flushing thread (if there is one) and flushes all pending outputs. - * - * @throws IOException I/O errors may happen during the final flush of the buffers. + * Closes the writer. This stops the flushing thread (if there is one). */ - public void close() throws IOException { - // propagate exceptions - flush(); - + public void close() { + // make sure we terminate the thread in any case if (outputFlusher != null) { + outputFlusher.terminate(); try { - outputFlusher.terminate(); outputFlusher.join(); } catch (InterruptedException e) { // ignore on close } } - - // final check for asynchronous errors, before we exit with a green light - checkErroneous(); } /** - * Notifies the writer that teh output flusher thread encountered an exception. + * Notifies the writer that the output flusher thread encountered an exception. * * @param t The exception to report. */ - void notifyFlusherException(Throwable t) { + private void notifyFlusherException(Throwable t) { if (this.flusherException == null) { this.flusherException = t; } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index e3d29116d4bb8..e0af72915229f 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -261,7 +261,6 @@ private void handleWatermark(TwoInputStreamOperator operator, Water } } } - } @Override @@ -271,17 +270,17 @@ public void setReporter(AccumulatorRegistry.Reporter reporter) { } } - public void clearBuffers() { + @Override + public void cleanup() throws IOException { + // clear the buffers first. this part should not ever fail for (RecordDeserializer deserializer : recordDeserializers) { Buffer buffer = deserializer.getCurrentBuffer(); if (buffer != null && !buffer.isRecycled()) { buffer.recycle(); } } - } - @Override - public void cleanup() throws IOException { + // cleanup the barrier handler resources barrierHandler.cleanup(); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java index 9eb9337689693..793e87e1607ce 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java @@ -18,10 +18,11 @@ package org.apache.flink.streaming.runtime.io; -import java.io.IOException; +import org.apache.flink.runtime.io.network.api.reader.ReaderBase; -public interface StreamingReader { +import java.io.IOException; - public void cleanup() throws IOException; +public interface StreamingReader extends ReaderBase { + void cleanup() throws IOException; } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 6136f247cbcb9..8ef02b2228957 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -23,87 +23,46 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.io.StreamInputProcessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class OneInputStreamTask extends StreamTask> { - private static final Logger LOG = LoggerFactory.getLogger(OneInputStreamTask.class); - private StreamInputProcessor inputProcessor; - - @Override - public void registerInputOutput() { - try { - super.registerInputOutput(); - - TypeSerializer inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader()); - int numberOfInputs = configuration.getNumberOfInputs(); - if (numberOfInputs > 0) { - InputGate[] inputGates = getEnvironment().getAllInputGates(); - inputProcessor = new StreamInputProcessor(inputGates, inSerializer, - getCheckpointBarrierListener(), - configuration.getCheckpointMode(), - getEnvironment().getIOManager(), - getExecutionConfig().areTimestampsEnabled()); - - // make sure that stream tasks report their I/O statistics - AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); - AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter(); - inputProcessor.setReporter(reporter); - } - } - catch (Exception e) { - throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e); - } - } + private volatile boolean running = true; + @Override - public void invoke() throws Exception { - boolean operatorOpen = false; - - if (LOG.isDebugEnabled()) { - LOG.debug("Task {} invoked", getName()); - } - - try { - openOperator(); - operatorOpen = true; + public void init() throws Exception { + TypeSerializer inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader()); + int numberOfInputs = configuration.getNumberOfInputs(); - while (inputProcessor.processInput(streamOperator)) { - // nothing to do, just keep processing - } + if (numberOfInputs > 0) { + InputGate[] inputGates = getEnvironment().getAllInputGates(); + inputProcessor = new StreamInputProcessor(inputGates, inSerializer, + getCheckpointBarrierListener(), + configuration.getCheckpointMode(), + getEnvironment().getIOManager(), + getExecutionConfig().areTimestampsEnabled()); - closeOperator(); - operatorOpen = false; + // make sure that stream tasks report their I/O statistics + AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); + AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter(); + inputProcessor.setReporter(reporter); + } + } - if (LOG.isDebugEnabled()) { - LOG.debug("Task {} invocation finished", getName()); - } + @Override + protected void run() throws Exception { + while (running && inputProcessor.processInput(streamOperator)); + } - } - catch (Exception e) { - LOG.error(getEnvironment().getTaskNameWithSubtasks() + " failed", e); - - if (operatorOpen) { - try { - closeOperator(); - } catch (Throwable t) { - LOG.warn("Exception while closing operator.", t); - } - } - - throw e; - } - finally { - this.isRunning = false; - // Cleanup - inputProcessor.clearBuffers(); - inputProcessor.cleanup(); - outputHandler.flushOutputs(); - clearBuffers(); - } + @Override + protected void cleanup() throws Exception { + inputProcessor.cleanup(); + } + @Override + protected void cancelTask() { + running = false; } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java index c8fa9e3bcd5fa..ce659fc733574 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java @@ -28,7 +28,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; -import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.api.watermark.Watermark; @@ -40,7 +39,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.runtime.io.RecordWriterFactory; +import org.apache.flink.streaming.runtime.io.StreamRecordWriter; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -218,37 +217,59 @@ private RecordWriterOutput createStreamOutput(StreamEdge edge, Integer ou TypeSerializer outSerializer = upStreamConfig.getTypeSerializerOut1(vertex.userClassLoader); - @SuppressWarnings("unchecked") StreamPartitioner outputPartitioner = (StreamPartitioner) edge.getPartitioner(); ResultPartitionWriter bufferWriter = vertex.getEnvironment().getWriter(outputIndex); - RecordWriter>> output = - RecordWriterFactory.createRecordWriter(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout()); + StreamRecordWriter>> output = + new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout()); output.setReporter(reporter); + + RecordWriterOutput streamOutput = + new RecordWriterOutput(output, outSerializer, vertex.getExecutionConfig().areTimestampsEnabled()); - @SuppressWarnings("unchecked") - RecordWriterOutput streamOutput = new RecordWriterOutput(output, outSerializer, vertex.getExecutionConfig().areTimestampsEnabled()); - - if (LOG.isTraceEnabled()) { - LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass() + if (LOG.isDebugEnabled()) { + LOG.debug("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass() .getSimpleName(), outputIndex, vertex.getClass().getSimpleName()); } return streamOutput; } - public void flushOutputs() throws IOException, InterruptedException { + /** + * + * This method should be called before finishing the record emission, to make sure any data + * that is still buffered will be sent. It also ensures that all data sending related + * exceptions are recognized. + * + * @throws IOException Thrown, if the buffered data cannot be pushed into the output streams. + */ + public void flushOutputs() throws IOException { for (RecordWriterOutput streamOutput : getOutputs()) { - streamOutput.close(); + streamOutput.flush(); } } - public void clearWriters() { - for (RecordWriterOutput output : outputMap.values()) { - output.clearBuffers(); + /** + * This method releases all resources of the record writer output. It stops the output + * flushing thread (if there is one) and releases all buffers currently held by the output + * serializers. + * + * This method should never fail. + */ + public void releaseOutputs() { + try { + for (RecordWriterOutput streamOutput : getOutputs()) { + streamOutput.close(); + } + } + finally { + // make sure that we release the buffers in any case + for (RecordWriterOutput output : getOutputs()) { + output.clearBuffers(); + } } } @@ -265,11 +286,9 @@ public void collect(StreamRecord record) { try { operator.getRuntimeContext().setNextInput(record); operator.processElement(record); - } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Could not forward element to operator.", e); - } - throw new RuntimeException(e); + } + catch (Exception e) { + throw new RuntimeException("Could not forward element to next operator", e); } } @@ -279,10 +298,7 @@ public void emitWatermark(Watermark mark) { operator.processWatermark(mark); } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Could not forward element to operator: {}", e); - } - throw new RuntimeException(e); + throw new RuntimeException("Could not forward watermark to next operator", e); } } @@ -292,10 +308,7 @@ public void close() { operator.close(); } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Could not forward close call to operator.", e); - } - throw new RuntimeException(e); + throw new RuntimeException("Could not close() call to next operator", e); } } } @@ -316,10 +329,7 @@ public void collect(StreamRecord record) { operator.processElement(serializer.copy(record)); } catch (Exception e) { - if (LOG.isErrorEnabled()) { - LOG.error("Could not forward element to operator.", e); - } - throw new RuntimeException(e); + throw new RuntimeException("Could not forward element to next operator", e); } } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 4b25577d26d96..e704e322b9d31 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -22,8 +22,6 @@ import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Task for executing streaming sources. @@ -39,61 +37,40 @@ */ public class SourceStreamTask extends StreamTask> { - private static final Logger LOG = LoggerFactory.getLogger(SourceStreamTask.class); - @Override - public void invoke() throws Exception { - final SourceOutput> output = new SourceOutput>(outputHandler.getOutput(), checkpointLock); - - boolean operatorOpen = false; - - if (LOG.isDebugEnabled()) { - LOG.debug("Task {} invoked", getName()); - } - - try { - openOperator(); - operatorOpen = true; - - streamOperator.run(checkpointLock, output); - - closeOperator(); - operatorOpen = false; - - if (LOG.isDebugEnabled()) { - LOG.debug("Task {} invocation finished", getName()); - } - - } - catch (Exception e) { - LOG.error(getEnvironment().getTaskNameWithSubtasks() + " failed", e); - - if (operatorOpen) { - try { - closeOperator(); - } - catch (Throwable t) { - LOG.warn("Exception while closing operator.", t); - } - } - throw e; - } - finally { - this.isRunning = false; - // Cleanup - outputHandler.flushOutputs(); - clearBuffers(); - } + protected void init() { + // does not hold any resources, so no initialization needed + } + @Override + protected void cleanup() { + // does not hold any resources, so no cleanup needed } + @Override - public void cancel() { - super.cancel(); + protected void run() throws Exception { + final Object checkpointLock = getCheckpointLock(); + + final SourceOutput> output = + new SourceOutput>(outputHandler.getOutput(), checkpointLock); + + streamOperator.run(checkpointLock, output); + } + + @Override + protected void cancelTask() throws Exception { streamOperator.cancel(); } + // ------------------------------------------------------------------------ + + // TODO: + // does this help with anything? The losk should be already held by the source function that + // emits. If that one does not hold the lock, then this does not help either. + private static class SourceOutput implements Output { + private final Output output; private final Object lockObject; @@ -114,7 +91,6 @@ public void collect(T record) { synchronized (lockObject) { output.collect(record); } - } @Override diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java index 2911f44963b3c..2ad2d2da2f1ae 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java @@ -18,13 +18,11 @@ package org.apache.flink.streaming.runtime.tasks; import java.util.Collection; -import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.runtime.accumulators.AccumulatorRegistry; +import org.apache.flink.api.common.JobID; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -36,76 +34,91 @@ public class StreamIterationHead extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class); + private volatile boolean running = true; - @SuppressWarnings("rawtypes") - private BlockingQueue dataChannel; - private long iterationWaitTime; - private boolean shouldWait; - - @SuppressWarnings("rawtypes") - public StreamIterationHead() { - dataChannel = new ArrayBlockingQueue(1); - } - + // ------------------------------------------------------------------------ + @Override - public void registerInputOutput() { - super.registerInputOutput(); - - final AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); - Map> accumulatorMap = registry.getUserMap(); - - outputHandler = new OutputHandler(this, accumulatorMap, outputHandler.reporter); - - String iterationId = configuration.getIterationId(); - iterationWaitTime = configuration.getIterationWaitTime(); - shouldWait = iterationWaitTime > 0; - - try { - BlockingQueueBroker.instance().handIn(iterationId+"-" - +getEnvironment().getIndexInSubtaskGroup(), dataChannel); - } catch (Exception e) { - throw new RuntimeException(e); + protected void run() throws Exception { + + final String iterationId = configuration.getIterationId(); + if (iterationId == null || iterationId.length() == 0) { + throw new Exception("Missing iteration ID in the task configuration"); } + + final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId , + getEnvironment().getIndexInSubtaskGroup()); + + final long iterationWaitTime = configuration.getIterationWaitTime(); + final boolean shouldWait = iterationWaitTime > 0; - } + final BlockingQueue> dataChannel = new ArrayBlockingQueue>(1); - @SuppressWarnings("unchecked") - @Override - public void invoke() throws Exception { - if (LOG.isDebugEnabled()) { - LOG.debug("Iteration source {} invoked", getName()); - } - - Collection> outputs = outputHandler.getOutputs(); + // offer the queue for the tail + BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel); + LOG.info("Iteration head {} added feedback queue under {}", getName(), brokerID); + // do the work try { - StreamRecord nextRecord; - - while (true) { - if (shouldWait) { - nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS); - } else { - nextRecord = dataChannel.take(); + @SuppressWarnings("unchecked") + Collection> outputs = + (Collection>) (Collection) outputHandler.getOutputs(); + + while (running) { + StreamRecord nextRecord = shouldWait ? + dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) : + dataChannel.take(); + + if (nextRecord != null) { + for (RecordWriterOutput output : outputs) { + output.collect(nextRecord); + } } - if (nextRecord == null) { + else { + // done break; } - for (RecordWriterOutput output : outputs) { - ((RecordWriterOutput) output).collect(nextRecord); - } } - - } - catch (Exception e) { - LOG.error("Iteration Head " + getEnvironment().getTaskNameWithSubtasks() + " failed", e); - - throw e; } finally { - // Cleanup - isRunning = false; - outputHandler.flushOutputs(); - clearBuffers(); + // make sure that we remove the queue from the broker, to prevent a resource leak + BlockingQueueBroker.INSTANCE.remove(brokerID); + LOG.info("Iteration head {} removed feedback queue under {}", getName(), brokerID); } } + + @Override + protected void cancelTask() { + running = false; + } + + // ------------------------------------------------------------------------ + + @Override + public void init() { + // does not hold any resources, no initialization necessary + } + + @Override + protected void cleanup() throws Exception { + // does not hold any resources, no cleanup necessary + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * Creates the identification string with which head and tail task find the shared blocking + * queue for the back channel. The identification string is unique per parallel head/tail pair + * per iteration per job. + * + * @param jid The job ID. + * @param iterationID The id of the iteration in the job. + * @param subtaskIndex The parallel subtask number + * @return The identification string. + */ + public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { + return jid + "-" + iterationID + "-" + subtaskIndex; + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java index 9fbc3a73d3611..fdce52d0ff1bd 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java @@ -25,7 +25,6 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.io.BlockingQueueBroker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,57 +32,61 @@ public class StreamIterationTail extends OneInputStreamTask { private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class); - private String iterationId; - - @SuppressWarnings("rawtypes") - private BlockingQueue dataChannel; - private long iterationWaitTime; - private boolean shouldWait; + @Override + public void init() throws Exception { + super.init(); + + final String iterationId = configuration.getIterationId(); + if (iterationId == null || iterationId.length() == 0) { + throw new Exception("Missing iteration ID in the task configuration"); + } - public StreamIterationTail() { - } + final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId, + getEnvironment().getIndexInSubtaskGroup()); - @Override - public void registerInputOutput() { - super.registerInputOutput(); + final long iterationWaitTime = configuration.getIterationWaitTime(); - try { - iterationId = configuration.getIterationId(); - iterationWaitTime = configuration.getIterationWaitTime(); - shouldWait = iterationWaitTime > 0; - dataChannel = BlockingQueueBroker.instance().get(iterationId+"-" - +getEnvironment().getIndexInSubtaskGroup()); - } catch (Exception e) { - throw new StreamTaskException(String.format( - "Cannot register inputs of StreamIterationSink %s", iterationId), e); - } - this.streamOperator = new RecordPusher(); + LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID); + + @SuppressWarnings("unchecked") + BlockingQueue> dataChannel = + (BlockingQueue>) BlockingQueueBroker.INSTANCE.get(brokerID); + + LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID); + + this.streamOperator = new RecordPusher<>(dataChannel, iterationWaitTime); } - class RecordPusher extends AbstractStreamOperator implements OneInputStreamOperator { + private static class RecordPusher extends AbstractStreamOperator implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; + @SuppressWarnings("NonSerializableFieldInSerializableClass") + private final BlockingQueue> dataChannel; + + private final long iterationWaitTime; + + private final boolean shouldWait; + + RecordPusher(BlockingQueue> dataChannel, long iterationWaitTime) { + this.dataChannel = dataChannel; + this.iterationWaitTime = iterationWaitTime; + this.shouldWait = iterationWaitTime > 0; + } + @Override public void processElement(StreamRecord record) throws Exception { - try { - if (shouldWait) { - dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS); - } else { - dataChannel.put(record); - } - } catch (InterruptedException e) { - if (LOG.isErrorEnabled()) { - LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId, - StringUtils.stringifyException(e)); - } - throw e; + if (shouldWait) { + dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS); + } + else { + dataChannel.put(record); } } @Override - public void processWatermark(Watermark mark) throws Exception { + public void processWatermark(Watermark mark) { // ignore } } - } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 88813d082990b..8a5f741ea7620 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.runtime.tasks; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -25,13 +24,13 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.functors.NotNullPredicate; + import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; -import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator; @@ -47,52 +46,96 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.state.OperatorStateHandle; import org.apache.flink.streaming.api.state.WrapperStateHandle; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * + *
+ *     
+ *  -- registerInputOutput()
+ *         |
+ *         +----> Create basic utils (config, etc) and load operators
+ *         +----> operator specific init()
+ *  
+ *  -- restoreState()
+ *  
+ *  -- invoke()
+ *        |
+ *        +----> open operators()
+ *        +----> run()
+ *        +----> close operators()
+ *        +----> common cleanup
+ *        +----> operator specific cleanup()
+ * 
+ * + * @param + * @param + */ public abstract class StreamTask> extends AbstractInvokable implements OperatorStateCarrier>, CheckpointedOperator, CheckpointNotificationOperator { private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); - protected final Object checkpointLock = new Object(); + + private final Object checkpointLock = new Object(); + + private final EventListener checkpointBarrierListener; + + protected final List contexts; + protected StreamingRuntimeContext headContext; + protected StreamConfig configuration; + protected ClassLoader userClassLoader; + protected OutputHandler outputHandler; protected O streamOperator; protected boolean hasChainedOperators; - // needs to be initialized to true, so that early cancel() before invoke() behaves correctly - protected volatile boolean isRunning = true; - - protected List contexts; - - protected StreamingRuntimeContext headContext; - - protected ClassLoader userClassLoader; + /** Flag to mark the task "in operation", in which case check + * needs to be initialized to true, so that early cancel() before invoke() behaves correctly */ + private volatile boolean isRunning; + + // ------------------------------------------------------------------------ - private EventListener checkpointBarrierListener; - public StreamTask() { - streamOperator = null; checkpointBarrierListener = new CheckpointBarrierListener(); contexts = new ArrayList(); } + // ------------------------------------------------------------------------ + // Life cycle methods for specific implementations + // ------------------------------------------------------------------------ + + protected abstract void init() throws Exception; + + protected abstract void run() throws Exception; + + protected abstract void cleanup() throws Exception; + + protected abstract void cancelTask() throws Exception; + + // ------------------------------------------------------------------------ + // Core work methods of the Stream Task + // ------------------------------------------------------------------------ + @Override - public void registerInputOutput() { - this.userClassLoader = getUserCodeClassLoader(); - this.configuration = new StreamConfig(getTaskConfiguration()); + public final void registerInputOutput() throws Exception { + LOG.debug("Begin initialization for {}", getName()); + + userClassLoader = getUserCodeClassLoader(); + configuration = new StreamConfig(getTaskConfiguration()); streamOperator = configuration.getStreamOperator(userClassLoader); // Create and register Accumulators - Environment env = getEnvironment(); - AccumulatorRegistry accumulatorRegistry = env.getAccumulatorRegistry(); + AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry(); Map> accumulatorMap = accumulatorRegistry.getUserMap(); AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter(); @@ -108,69 +151,76 @@ public void registerInputOutput() { } hasChainedOperators = outputHandler.getChainedOperators().size() != 1; + + // operator specific initialization + init(); + + LOG.debug("Finish initialization for {}", getName()); } + + @Override + public final void invoke() throws Exception { + LOG.debug("Invoking {}", getName()); + + boolean operatorOpen = false; + try { + openAllOperators(); + operatorOpen = true; + + // let the task do its work + isRunning = true; + run(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Finished task {}", getName()); + } - public String getName() { - return getEnvironment().getTaskName(); - } - - public StreamingRuntimeContext createRuntimeContext(StreamConfig conf, Map> accumulatorMap) { - Environment env = getEnvironment(); - String operatorName = conf.getStreamOperator(userClassLoader).getClass().getSimpleName(); - - KeySelector statePartitioner = conf.getStatePartitioner(userClassLoader); - - return new StreamingRuntimeContext(operatorName, env, getUserCodeClassLoader(), - getExecutionConfig(), statePartitioner, getStateHandleProvider(), accumulatorMap); - } - - private StateHandleProvider getStateHandleProvider() { - - StateHandleProvider provider = configuration - .getStateHandleProvider(userClassLoader); - - // If the user did not specify a provider in the program we try to get it from the config - if (provider == null) { - String backendName = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND, - ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase(); - - StateBackend backend; - + // this is part of the main logic, so if this fails, the task is considered failed + closeAllOperators(); + operatorOpen = false; + + // make sure all data if flushed + outputHandler.flushOutputs(); + } + finally { + this.isRunning = false; + try { - backend = StateBackend.valueOf(backendName); - } catch (Exception e) { - throw new RuntimeException(backendName + " is not a valid state backend.\nSupported backends: jobmanager, filesystem."); + if (operatorOpen) { + // we came here in a failure + closeAllOperators(); + } } - - switch (backend) { - case JOBMANAGER: - LOG.info("State backend for state checkpoints is set to jobmanager."); - return new LocalStateHandle.LocalStateHandleProvider(); - case FILESYSTEM: - String checkpointDir = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null); - if (checkpointDir != null) { - LOG.info("State backend for state checkpoints is set to filesystem with directory: " - + checkpointDir); - return FileStateHandle.createProvider(checkpointDir); - } else { - throw new RuntimeException( - "For filesystem checkpointing, a checkpoint directory needs to be specified.\nFor example: \"state.backend.dir: hdfs://checkpoints\""); - } - default: - throw new RuntimeException("Backend " + backend + " is not supported yet."); + catch (Throwable t) { + LOG.error("Error closing stream operators after an exception.", t); + } + finally { + // we must! perform this cleanup + + // release the output resources + if (outputHandler != null) { + outputHandler.releaseOutputs(); + } - } else { - LOG.info("Using user defined state backend for streaming checkpoitns."); - return provider; + // release this operator's resources + try { + cleanup(); + } + catch (Throwable t) { + LOG.error("Error during cleanup of stream task."); + } + } } } - - private enum StateBackend { - JOBMANAGER, FILESYSTEM + + @Override + public final void cancel() throws Exception { + isRunning = false; + cancelTask(); } - - protected void openOperator() throws Exception { + + private void openAllOperators() throws Exception { for (StreamOperator operator : outputHandler.getChainedOperators()) { if (operator != null) { operator.open(getTaskConfiguration()); @@ -178,7 +228,7 @@ protected void openOperator() throws Exception { } } - protected void closeOperator() throws Exception { + private void closeAllOperators() throws Exception { // We need to close them first to last, since upstream operators in the chain might emit // elements in their close methods. for (int i = outputHandler.getChainedOperators().size()-1; i >= 0; i--) { @@ -189,19 +239,20 @@ protected void closeOperator() throws Exception { } } - protected void clearBuffers() throws IOException { - if (outputHandler != null) { - outputHandler.clearWriters(); - } - } + // ------------------------------------------------------------------------ + // Access to properties and utilities + // ------------------------------------------------------------------------ - @Override - public void cancel() { - this.isRunning = false; + /** + * Gets the name of the task, in the form "taskname (2/5)". + * @return The name of the task. + */ + public String getName() { + return getEnvironment().getTaskNameWithSubtasks(); } - public EventListener getCheckpointBarrierListener() { - return this.checkpointBarrierListener; + public Object getCheckpointLock() { + return checkpointLock; } // ------------------------------------------------------------------------ @@ -212,8 +263,9 @@ public EventListener getCheckpointBarrierListener() { @Override public void setInitialState(StateHandle stateHandle) throws Exception { - // We retrieve end restore the states for the chained oeprators. - List, Map>> chainedStates = (List, Map>>) stateHandle.getState(); + // We retrieve end restore the states for the chained operators. + List, Map>> chainedStates = + (List, Map>>) stateHandle.getState(); // We restore all stateful operators for (int i = 0; i < chainedStates.size(); i++) { @@ -224,7 +276,6 @@ public void setInitialState(StateHandle stateHandle) throws Except ((StatefulStreamOperator) chainedOperator).restoreInitialState(state); } } - } @Override @@ -235,10 +286,9 @@ public void triggerCheckpoint(long checkpointId, long timestamp) throws Exceptio synchronized (checkpointLock) { if (isRunning) { try { - - - // We wrap the states of the chained operators in a list, marking non-stateful oeprators with null - List, Map>> chainedStates = new ArrayList, Map>>(); + // We wrap the states of the chained operators in a list, marking non-stateful operators with null + List, Map>> chainedStates + = new ArrayList, Map>>(); // A wrapper handle is created for the List of statehandles WrapperStateHandle stateHandle; @@ -278,35 +328,89 @@ public void triggerCheckpoint(long checkpointId, long timestamp) throws Exceptio } } } - } - - @SuppressWarnings("rawtypes") + @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { synchronized (checkpointLock) { - for (StreamOperator chainedOperator : outputHandler.getChainedOperators()) { if (chainedOperator instanceof StatefulStreamOperator) { - ((StatefulStreamOperator) chainedOperator).notifyCheckpointComplete(checkpointId); + ((StatefulStreamOperator) chainedOperator).notifyCheckpointComplete(checkpointId); } } + } + } + + // ------------------------------------------------------------------------ + // State backend + // ------------------------------------------------------------------------ + + private StateHandleProvider getStateHandleProvider() { + StateHandleProvider provider = configuration.getStateHandleProvider(userClassLoader); + + // If the user did not specify a provider in the program we try to get it from the config + if (provider == null) { + String backendName = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND, + ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase(); + + StateBackend backend; + + try { + backend = StateBackend.valueOf(backendName); + } catch (Exception e) { + throw new RuntimeException(backendName + " is not a valid state backend.\nSupported backends: jobmanager, filesystem."); + } + + switch (backend) { + case JOBMANAGER: + LOG.info("State backend for state checkpoints is set to jobmanager."); + return new LocalStateHandle.LocalStateHandleProvider(); + case FILESYSTEM: + String checkpointDir = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null); + if (checkpointDir != null) { + LOG.info("State backend for state checkpoints is set to filesystem with directory: " + + checkpointDir); + return FileStateHandle.createProvider(checkpointDir); + } else { + throw new RuntimeException( + "For filesystem checkpointing, a checkpoint directory needs to be specified.\nFor example: \"state.backend.dir: hdfs://checkpoints\""); + } + default: + throw new RuntimeException("Backend " + backend + " is not supported yet."); + } + } else { + LOG.info("Using user defined state backend for streaming checkpoitns."); + return provider; } } + private enum StateBackend { + JOBMANAGER, FILESYSTEM + } // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ + public StreamingRuntimeContext createRuntimeContext(StreamConfig conf, Map> accumulatorMap) { + KeySelector statePartitioner = conf.getStatePartitioner(userClassLoader); + + return new StreamingRuntimeContext(getEnvironment(), getExecutionConfig(), + statePartitioner, getStateHandleProvider(), accumulatorMap); + } + @Override public String toString() { - return getEnvironment().getTaskNameWithSubtasks(); + return getName(); } // ------------------------------------------------------------------------ + public EventListener getCheckpointBarrierListener() { + return this.checkpointBarrierListener; + } + private class CheckpointBarrierListener implements EventListener { @Override diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java index 8c354bee4dcb4..2ca286260a935 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java @@ -45,28 +45,32 @@ * Implementation of the {@link RuntimeContext}, created by runtime stream UDF * operators. */ -@SuppressWarnings("rawtypes") public class StreamingRuntimeContext extends RuntimeUDFContext { private final Environment env; - private final Map states; - private final List partitionedStates; + private final Map> states; + private final List> partitionedStates; private final KeySelector statePartitioner; private final StateHandleProvider provider; - private final ClassLoader cl; - + + @SuppressWarnings("unchecked") - public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader, - ExecutionConfig executionConfig, KeySelector statePartitioner, - StateHandleProvider provider, Map> accumulatorMap) { - super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader, - executionConfig, env.getDistributedCacheEntries(), accumulatorMap); + public StreamingRuntimeContext( + Environment env, + ExecutionConfig executionConfig, + KeySelector statePartitioner, + StateHandleProvider provider, + Map> accumulatorMap) { + + super(env.getTaskName(), env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), + env.getUserClassLoader(), executionConfig, + env.getDistributedCacheEntries(), accumulatorMap); + this.env = env; this.statePartitioner = statePartitioner; - this.states = new HashMap(); - this.partitionedStates = new LinkedList(); + this.states = new HashMap<>(); + this.partitionedStates = new LinkedList<>(); this.provider = (StateHandleProvider) provider; - this.cl = userCodeClassLoader; } /** @@ -121,14 +125,14 @@ public OperatorState getOperatorState(String name, S public StreamOperatorState getState(String name, boolean partitioned) { // Try fetching state from the map - StreamOperatorState state = states.get(name); + StreamOperatorState state = states.get(name); if (state == null) { // If not found, create empty state and add to the map state = createRawState(partitioned); states.put(name, state); // We keep a reference to all partitioned states for registering input if (state instanceof PartitionedStreamOperatorState) { - partitionedStates.add((PartitionedStreamOperatorState) state); + partitionedStates.add((PartitionedStreamOperatorState) state); } } return state; @@ -139,11 +143,11 @@ public OperatorState getOperatorState(String name, S * * @return An empty operator state. */ - @SuppressWarnings("unchecked") - public StreamOperatorState createRawState(boolean partitioned) { + @SuppressWarnings({"rawtypes", "unchecked"}) + public StreamOperatorState createRawState(boolean partitioned) { if (partitioned) { if (statePartitioner != null) { - return new PartitionedStreamOperatorState(provider, statePartitioner, cl); + return new PartitionedStreamOperatorState(provider, statePartitioner, getUserCodeClassLoader()); } else { throw new RuntimeException( "Partitioned state can only be used with KeyedDataStreams."); @@ -158,7 +162,7 @@ public StreamOperatorState createRawState(boolean partitioned) { * * @return All the states for the underlying operator. */ - public Map getOperatorStates() { + public Map> getOperatorStates() { return states; } @@ -169,7 +173,7 @@ public Map getOperatorStates() { * @param nextRecord * Next input of the operator. */ - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public void setNextInput(StreamRecord nextRecord) { if (statePartitioner != null) { for (PartitionedStreamOperatorState state : partitionedStates) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 8cf5a405a885e..5d0497d91ca5a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -17,7 +17,6 @@ package org.apache.flink.streaming.runtime.tasks; -import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -27,114 +26,65 @@ import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TwoInputStreamTask extends StreamTask> { - private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class); - private StreamTwoInputProcessor inputProcessor; + + private volatile boolean running = true; @Override - public void registerInputOutput() { - try { - super.registerInputOutput(); + public void init() throws Exception { + TypeSerializer inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader); + TypeSerializer inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader); - TypeSerializer inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader); - TypeSerializer inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader); + int numberOfInputs = configuration.getNumberOfInputs(); - int numberOfInputs = configuration.getNumberOfInputs(); + ArrayList inputList1 = new ArrayList(); + ArrayList inputList2 = new ArrayList(); - ArrayList inputList1 = new ArrayList(); - ArrayList inputList2 = new ArrayList(); + List inEdges = configuration.getInPhysicalEdges(userClassLoader); - List inEdges = configuration.getInPhysicalEdges(userClassLoader); - - for (int i = 0; i < numberOfInputs; i++) { - int inputType = inEdges.get(i).getTypeNumber(); - InputGate reader = getEnvironment().getInputGate(i); - switch (inputType) { - case 1: - inputList1.add(reader); - break; - case 2: - inputList2.add(reader); - break; - default: - throw new RuntimeException("Invalid input type number: " + inputType); - } + for (int i = 0; i < numberOfInputs; i++) { + int inputType = inEdges.get(i).getTypeNumber(); + InputGate reader = getEnvironment().getInputGate(i); + switch (inputType) { + case 1: + inputList1.add(reader); + break; + case 2: + inputList2.add(reader); + break; + default: + throw new RuntimeException("Invalid input type number: " + inputType); } - - this.inputProcessor = new StreamTwoInputProcessor(inputList1, inputList2, - inputDeserializer1, inputDeserializer2, - getCheckpointBarrierListener(), - configuration.getCheckpointMode(), - getEnvironment().getIOManager(), - getExecutionConfig().areTimestampsEnabled()); - - // make sure that stream tasks report their I/O statistics - AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); - AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter(); - this.inputProcessor.setReporter(reporter); - } - catch (Exception e) { - throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e); } + + this.inputProcessor = new StreamTwoInputProcessor(inputList1, inputList2, + inputDeserializer1, inputDeserializer2, + getCheckpointBarrierListener(), + configuration.getCheckpointMode(), + getEnvironment().getIOManager(), + getExecutionConfig().areTimestampsEnabled()); + + // make sure that stream tasks report their I/O statistics + AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry(); + AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter(); + this.inputProcessor.setReporter(reporter); } @Override - public void invoke() throws Exception { - boolean operatorOpen = false; - - if (LOG.isDebugEnabled()) { - LOG.debug("Task {} invoked", getName()); - } - - try { - - openOperator(); - operatorOpen = true; - - while (inputProcessor.processInput(streamOperator)) { - // do nothing, just keep processing - } - - closeOperator(); - operatorOpen = false; - - if (LOG.isDebugEnabled()) { - LOG.debug("Task {} invocation finished", getName()); - } - - } - catch (Exception e) { - LOG.error(getEnvironment().getTaskNameWithSubtasks() + " failed", e); - - if (operatorOpen) { - try { - closeOperator(); - } - catch (Throwable t) { - LOG.warn("Exception while closing operator.", t); - } - } - - throw e; - } - finally { - this.isRunning = false; - // Cleanup - outputHandler.flushOutputs(); - clearBuffers(); - } - + protected void run() throws Exception { + while (running && inputProcessor.processInput(streamOperator)); } @Override - public void clearBuffers() throws IOException { - super.clearBuffers(); - inputProcessor.clearBuffers(); + protected void cleanup() throws Exception { inputProcessor.cleanup(); } + + @Override + protected void cancelTask() { + running = false; + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java index 6ca38b7ffe442..60db798ddcbf1 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java @@ -141,9 +141,12 @@ private StreamMap createOperatorWithContext(List output KeySelector partitioner, byte[] serializedState) throws Exception { final List outputList = output; - StreamingRuntimeContext context = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, - new MockInputSplitProvider(), 1024), null, new ExecutionConfig(), partitioner, - new LocalStateHandleProvider(), new HashMap>()); + StreamingRuntimeContext context = new StreamingRuntimeContext( + new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024), + new ExecutionConfig(), + partitioner, + new LocalStateHandleProvider(), + new HashMap>()); StreamMap op = new StreamMap(new StatefulMapper()); @@ -217,14 +220,15 @@ public void open(Configuration conf) throws IOException { } } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings("unchecked") @Override public void close() throws Exception { - Map states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates(); + Map> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates(); PartitionedStreamOperatorState groupCounter = (PartitionedStreamOperatorState) states.get("groupCounter"); for (Entry count : groupCounter.getPartitionedState().entrySet()) { Integer key = (Integer) count.getKey(); Integer expected = key < 3 ? 2 : 1; + assertEquals(new MutableInt(expected), count.getValue()); } } @@ -257,11 +261,12 @@ public void open(Configuration conf) throws IOException { groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true); } - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings("unchecked") @Override public void close() throws Exception { - Map states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates(); - PartitionedStreamOperatorState groupCounter = (PartitionedStreamOperatorState) states.get("groupCounter"); + Map> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates(); + PartitionedStreamOperatorState groupCounter = + (PartitionedStreamOperatorState) states.get("groupCounter"); for (Entry count : groupCounter.getPartitionedState().entrySet()) { Integer key = (Integer) count.getKey(); Integer expected = key < 3 ? 2 : 1; diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java index f07e3a5960108..7cc19588447bb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java @@ -85,12 +85,7 @@ public void testPropagateAsyncFlushError() { } finally { if (testWriter != null) { - try { - testWriter.close(); - } - catch (IOException e) { - // ignore in tests - } + testWriter.close(); } } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 296324a7d6866..4c6957b488288 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -168,7 +168,6 @@ public void testWatermarkForwarding() throws Exception { * This test verifies that checkpoint barriers are correctly forwarded. */ @Test - @SuppressWarnings("unchecked") public void testCheckpointBarriers() throws Exception { final OneInputStreamTask mapTask = new OneInputStreamTask(); final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); @@ -226,7 +225,6 @@ public void testCheckpointBarriers() throws Exception { * then all inputs receive barriers from a later checkpoint. */ @Test - @SuppressWarnings("unchecked") public void testOvertakingCheckpointBarriers() throws Exception { final OneInputStreamTask mapTask = new OneInputStreamTask(); final OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); @@ -282,7 +280,6 @@ public void testOvertakingCheckpointBarriers() throws Exception { testHarness.waitForInputProcessing(); - testHarness.endInput(); testHarness.waitForTaskCompletion(); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java index 4f07fdb234dd5..7fb8ba35ddd6e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java @@ -62,7 +62,6 @@ public class OneInputStreamTaskTestHarness extends StreamTaskTestHarnes */ public OneInputStreamTaskTestHarness(OneInputStreamTask task, int numInputGates, - int numInputChannelsPerGate, TypeInformation inputType, TypeInformation outputType) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 0f372cbdd90a8..987d0bc791cc9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -83,7 +83,7 @@ public class StreamTaskTestHarness { private ConcurrentLinkedQueue outputList; - protected Thread taskThread; + protected TaskThread taskThread; // These don't get initialized, the one-input/two-input specific test harnesses // must initialize these if they want to simulate input. We have them here so that all the @@ -161,32 +161,19 @@ public void invoke() throws Exception { task.registerInputOutput(); - taskThread = new Thread(new Runnable() { - @Override - public void run() { - - - - try { - task.invoke(); - shutdownIOManager(); - shutdownMemoryManager(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - } - }); - + taskThread = new TaskThread(task); taskThread.start(); } - public void waitForTaskCompletion() throws InterruptedException { + public void waitForTaskCompletion() throws Exception { if (taskThread == null) { throw new IllegalStateException("Task thread was not started."); } taskThread.join(); + if (taskThread.getError() != null) { + throw new Exception("error in task", taskThread.getError()); + } } /** @@ -300,5 +287,36 @@ public void endInput() { inputGates[i].endInput(); } } + + // ------------------------------------------------------------------------ + + private class TaskThread extends Thread { + + private final AbstractInvokable task; + + private volatile Throwable error; + + + TaskThread(AbstractInvokable task) { + super("Task Thread"); + this.task = task; + } + + @Override + public void run() { + try { + task.invoke(); + shutdownIOManager(); + shutdownMemoryManager(); + } + catch (Throwable t) { + this.error = t; + } + } + + public Throwable getError() { + return error; + } + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java index 45ae88f5a0a5b..f9b0b090ce93e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java @@ -57,9 +57,11 @@ public Output> getOutput() { public static List createAndExecute(OneInputStreamOperator operator, List inputs) { MockContext mockContext = new MockContext(inputs); - StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", - new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null, - new ExecutionConfig(), null, null, new HashMap>()); + StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext( + new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024), + new ExecutionConfig(), + null, null, + new HashMap>()); operator.setup(mockContext.output, runtimeContext); try { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 6652fde09cced..f404d01378fc5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -60,9 +60,7 @@ public OneInputStreamOperatorTestHarness(OneInputStreamOperator operato executionConfig = new ExecutionConfig(); StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext( - "MockTwoInputTask", - new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), - getClass().getClassLoader(), + new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024), executionConfig, null, new LocalStateHandle.LocalStateHandleProvider(), diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java index 2d7f6b5776b5e..711dd41c4f97a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java @@ -42,8 +42,13 @@ public class SourceFunctionUtil { public static List runSourceFunction(SourceFunction sourceFunction) throws Exception { List outputs = new ArrayList(); if (sourceFunction instanceof RichFunction) { - RuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null, - new ExecutionConfig(), null, new LocalStateHandle.LocalStateHandleProvider(), new HashMap>()); + RuntimeContext runtimeContext = new StreamingRuntimeContext( + new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024), + new ExecutionConfig(), + null, + new LocalStateHandle.LocalStateHandleProvider(), + new HashMap>()); + ((RichFunction) sourceFunction).setRuntimeContext(runtimeContext); ((RichFunction) sourceFunction).open(new Configuration()); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java index a9ebd0bcee0e0..09db1f4f305b2 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java @@ -66,7 +66,7 @@ public class StreamingMultipleProgramsTestBase extends TestBaseUtils { // ------------------------------------------------------------------------ public StreamingMultipleProgramsTestBase() { - TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4); + TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, DEFAULT_PARALLELISM); clusterEnv.setAsContext(); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index 1e8b5c62dcc39..21e2e1e6ffd59 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.util; import org.apache.flink.api.common.ExecutionConfig; @@ -33,7 +34,6 @@ import java.io.Serializable; import java.util.HashMap; -import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -60,9 +60,7 @@ public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator o executionConfig = new ExecutionConfig(); StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext( - "MockTwoInputTask", - new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), - getClass().getClassLoader(), + new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024), new ExecutionConfig(), null, new LocalStateHandle.LocalStateHandleProvider(), From 1023b6076b15e8cb9d07e61b818aab45f0767b78 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Sun, 16 Aug 2015 16:52:16 +0200 Subject: [PATCH 2/2] [tests] Reinforce StateCheckpoinedITCase to make sure actual checkpointing has happened before a failure. --- .../checkpointing/StateCheckpoinedITCase.java | 62 ++++++++++++------- .../StreamFaultToleranceTestBase.java | 20 +----- 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java index 072086b09224b..2c2f2b48166f9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java @@ -22,30 +22,24 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.OperatorState; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.Collector; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; import java.io.IOException; -import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * A simple test that runs a streaming topology with checkpointing enabled. @@ -53,6 +47,10 @@ * The test triggers a failure after a while and verifies that, after completion, the * state defined with either the {@link OperatorState} or the {@link Checkpointed} * interface reflects the "exactly once" semantics. + * + * The test throttles the input until at least two checkpoints are completed, to make sure that + * the recovery does not fall back to "square one" (which would naturally lead to correct + * results without testing the checkpointing). */ @SuppressWarnings("serial") public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase { @@ -63,17 +61,24 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase { * Runs the following program: * *
-	 *     [ (source)->(filter)->(map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
+	 *     [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
 	 * 
*/ @Override public void testProgram(StreamExecutionEnvironment env) { assertTrue("Broken test setup", NUM_STRINGS % 40 == 0); + final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM); + final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM); + + final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; + DataStream stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)); stream - // -------------- first vertex, chained to the source ---------------- + // first vertex, chained to the source + // this filter throttles the flow until at least one checkpoint + // is complete, to make sure this program does not run without .filter(new StringRichFilterFunction()) // -------------- seconds vertex - one-to-one connected ---------------- @@ -83,12 +88,16 @@ public void testProgram(StreamExecutionEnvironment env) { // -------------- third vertex - reducer and the sink ---------------- .partitionByHash("prefix") - .flatMap(new OnceFailingAggregator(NUM_STRINGS)) + .flatMap(new OnceFailingAggregator(failurePos)) .addSink(new ValidatingSink()); } @Override public void postSubmit() { + + assertTrue("Test inconclusive: failure occurred before first checkpoint", + OnceFailingAggregator.wasCheckpointedBeforeFailure); + long filterSum = 0; for (long l : StringRichFilterFunction.counts) { filterSum += l; @@ -189,14 +198,15 @@ public void restoreState(Integer state) { } } - private static class StringRichFilterFunction extends RichFilterFunction implements Checkpointed { + private static class StringRichFilterFunction extends RichFilterFunction + implements Checkpointed { static final long[] counts = new long[PARALLELISM]; - + private long count; - + @Override - public boolean filter(String value) { + public boolean filter(String value) throws Exception { count++; return value.length() < 100; // should be always true } @@ -271,35 +281,34 @@ public void close() throws IOException { } private static class OnceFailingAggregator extends RichFlatMapFunction - implements Checkpointed> { + implements Checkpointed>, CheckpointNotifier { + static boolean wasCheckpointedBeforeFailure = false; + private static volatile boolean hasFailed = false; private final HashMap aggregationMap = new HashMap(); - private final long numElements; - private long failurePos; private long count; + private boolean wasCheckpointed; + - OnceFailingAggregator(long numElements) { - this.numElements = numElements; + OnceFailingAggregator(long failurePos) { + this.failurePos = failurePos; } @Override public void open(Configuration parameters) { - long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - - failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; count = 0; } @Override public void flatMap(PrefixCount value, Collector out) throws Exception { count++; - if (!hasFailed && count >= failurePos) { + if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 1) { + wasCheckpointedBeforeFailure = wasCheckpointed; hasFailed = true; throw new Exception("Test Failure"); } @@ -324,6 +333,11 @@ public HashMap snapshotState(long checkpointId, long checkp public void restoreState(HashMap state) { aggregationMap.putAll(state); } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + this.wasCheckpointed = true; + } } private static class ValidatingSink extends RichSinkFunction diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java index 29933156d7d03..8920cf2fa2077 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java @@ -18,38 +18,22 @@ package org.apache.flink.test.checkpointing; - -import org.apache.flink.api.common.functions.RichFilterFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.test.util.ForkableFlinkMiniCluster; + import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import java.io.IOException; import java.io.Serializable; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - /** * Test base for fault tolerant streaming programs */ -@SuppressWarnings("serial") public abstract class StreamFaultToleranceTestBase { protected static final int NUM_TASK_MANAGERS = 2; @@ -127,6 +111,7 @@ public void runCheckpointedProgram() { // Frequently used utilities // -------------------------------------------------------------------------------------------- + @SuppressWarnings("serial") public static class PrefixCount implements Serializable { public String prefix; @@ -146,5 +131,4 @@ public String toString() { return prefix + " / " + value; } } - }