From 3756cf732dcb61d61c59cb962188f01d84504308 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Wed, 9 May 2018 11:12:18 +0200 Subject: [PATCH 1/4] [hotfix][tests] Introduce MockEnvironmentBuilder to deduplicate MockEnvironment constructors --- .../kafka/FlinkKafkaConsumerBaseTest.java | 13 +- .../kinesis/testutils/TestRuntimeContext.java | 13 +- .../operators/testutils/MockEnvironment.java | 102 +------------- .../testutils/MockEnvironmentBuilder.java | 125 ++++++++++++++++++ .../operators/testutils/TaskTestBase.java | 8 +- .../source/InputFormatSourceFunctionTest.java | 13 +- .../StreamOperatorSnapshotRestoreTest.java | 29 ++-- .../async/AsyncWaitOperatorTest.java | 13 +- .../operators/StreamOperatorChainingTest.java | 14 +- .../runtime/tasks/StreamTaskTest.java | 35 +++-- .../AbstractStreamOperatorTestHarness.java | 21 ++- .../streaming/util/SourceFunctionUtil.java | 15 ++- .../PojoSerializerUpgradeTest.java | 25 ++-- 13 files changed, 224 insertions(+), 202 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index b226ff1360a60..46050153a1d0c 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -33,10 +33,9 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; -import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; @@ -834,12 +833,10 @@ private MockRuntimeContext( super( new MockStreamOperator(), - new MockEnvironment( - "mockTask", - 4 * MemoryManager.DEFAULT_PAGE_SIZE, - null, - 16, - new TestTaskStateManager()), + new MockEnvironmentBuilder() + .setTaskName("mockTask") + .setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE) + .build(), Collections.emptyMap()); this.isCheckpointingEnabled = isCheckpointingEnabled; diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java index ce0bd97543344..740d2f2ed6066 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java @@ -21,8 +21,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -45,12 +44,10 @@ public TestRuntimeContext( super( new TestStreamOperator(), - new MockEnvironment( - "mockTask", - 4 * MemoryManager.DEFAULT_PAGE_SIZE, - null, - 16, - new TestTaskStateManager()), + new MockEnvironmentBuilder() + .setTaskName("mockTask") + .setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE) + .build(), Collections.emptyMap()); this.isCheckpointingEnabled = isCheckpointingEnabled; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index ce19a5e938f71..4bf94e93463dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -45,7 +45,6 @@ import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.TaskStateManager; -import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.types.Record; @@ -109,106 +108,11 @@ public class MockEnvironment implements Environment, AutoCloseable { private Optional actualExternalFailureCause = Optional.empty(); - public MockEnvironment() { - this( - "mock-task", - 1024 * MemoryManager.DEFAULT_PAGE_SIZE, - null, - 16, - new TestTaskStateManager()); + public static MockEnvironmentBuilder builder() { + return new MockEnvironmentBuilder(); } - public MockEnvironment( - String taskName, - long memorySize, - MockInputSplitProvider inputSplitProvider, - int bufferSize, - TaskStateManager taskStateManager) { - this( - taskName, - memorySize, - inputSplitProvider, - bufferSize, - new Configuration(), - new ExecutionConfig(), - taskStateManager); - } - - public MockEnvironment( - String taskName, - long memorySize, - MockInputSplitProvider inputSplitProvider, - int bufferSize, Configuration taskConfiguration, - ExecutionConfig executionConfig, - TaskStateManager taskStateManager) { - this( - taskName, - memorySize, - inputSplitProvider, - bufferSize, - taskConfiguration, - executionConfig, - taskStateManager, - 1, - 1, - 0); - } - - public MockEnvironment( - String taskName, - long memorySize, - MockInputSplitProvider inputSplitProvider, - int bufferSize, - Configuration taskConfiguration, - ExecutionConfig executionConfig, - TaskStateManager taskStateManager, - int maxParallelism, - int parallelism, - int subtaskIndex) { - this( - taskName, - memorySize, - inputSplitProvider, - bufferSize, - taskConfiguration, - executionConfig, - taskStateManager, - maxParallelism, - parallelism, - subtaskIndex, - Thread.currentThread().getContextClassLoader()); - - } - - public MockEnvironment( - String taskName, - long memorySize, - MockInputSplitProvider inputSplitProvider, - int bufferSize, - Configuration taskConfiguration, - ExecutionConfig executionConfig, - TaskStateManager taskStateManager, - int maxParallelism, - int parallelism, - int subtaskIndex, - ClassLoader userCodeClassLoader) { - this( - new JobID(), - new JobVertexID(), - taskName, - memorySize, - inputSplitProvider, - bufferSize, - taskConfiguration, - executionConfig, - taskStateManager, - maxParallelism, - parallelism, - subtaskIndex, - userCodeClassLoader); - } - - public MockEnvironment( + protected MockEnvironment( JobID jobID, JobVertexID jobVertexID, String taskName, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java new file mode 100644 index 0000000000000..dfb10d4293ad8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.testutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.state.TaskStateManager; +import org.apache.flink.runtime.state.TestTaskStateManager; + +public class MockEnvironmentBuilder { + private String taskName = "mock-task"; + private long memorySize = 1024 * MemoryManager.DEFAULT_PAGE_SIZE; + private MockInputSplitProvider inputSplitProvider = null; + private int bufferSize = 16; + private TaskStateManager taskStateManager = new TestTaskStateManager(); + private Configuration taskConfiguration = new Configuration(); + private ExecutionConfig executionConfig = new ExecutionConfig(); + private int maxParallelism = 1; + private int parallelism = 1; + private int subtaskIndex = 0; + private ClassLoader userCodeClassLoader = Thread.currentThread().getContextClassLoader(); + private JobID jobID = new JobID(); + private JobVertexID jobVertexID = new JobVertexID(); + + public MockEnvironmentBuilder setTaskName(String taskName) { + this.taskName = taskName; + return this; + } + + public MockEnvironmentBuilder setMemorySize(long memorySize) { + this.memorySize = memorySize; + return this; + } + + public MockEnvironmentBuilder setInputSplitProvider(MockInputSplitProvider inputSplitProvider) { + this.inputSplitProvider = inputSplitProvider; + return this; + } + + public MockEnvironmentBuilder setBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + return this; + } + + public MockEnvironmentBuilder setTaskStateManager(TaskStateManager taskStateManager) { + this.taskStateManager = taskStateManager; + return this; + } + + public MockEnvironmentBuilder setTaskConfiguration(Configuration taskConfiguration) { + this.taskConfiguration = taskConfiguration; + return this; + } + + public MockEnvironmentBuilder setExecutionConfig(ExecutionConfig executionConfig) { + this.executionConfig = executionConfig; + return this; + } + + public MockEnvironmentBuilder setMaxParallelism(int maxParallelism) { + this.maxParallelism = maxParallelism; + return this; + } + + public MockEnvironmentBuilder setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + + public MockEnvironmentBuilder setSubtaskIndex(int subtaskIndex) { + this.subtaskIndex = subtaskIndex; + return this; + } + + public MockEnvironmentBuilder setUserCodeClassLoader(ClassLoader userCodeClassLoader) { + this.userCodeClassLoader = userCodeClassLoader; + return this; + } + + public MockEnvironmentBuilder setJobID(JobID jobID) { + this.jobID = jobID; + return this; + } + + public MockEnvironmentBuilder setJobVertexID(JobVertexID jobVertexID) { + this.jobVertexID = jobVertexID; + return this; + } + + public MockEnvironment build() { + return new MockEnvironment( + jobID, + jobVertexID, + taskName, + memorySize, + inputSplitProvider, + bufferSize, + taskConfiguration, + executionConfig, + taskStateManager, + maxParallelism, + parallelism, + subtaskIndex, + userCodeClassLoader); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java index a40992c18003e..16485caf2a34f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java @@ -54,8 +54,12 @@ public abstract class TaskTestBase extends TestLogger { public void initEnvironment(long memorySize, int bufferSize) { this.memorySize = memorySize; this.inputSplitProvider = new MockInputSplitProvider(); - TestTaskStateManager taskStateManager = new TestTaskStateManager(); - this.mockEnv = new MockEnvironment("mock task", this.memorySize, this.inputSplitProvider, bufferSize, taskStateManager); + this.mockEnv = new MockEnvironmentBuilder() + .setTaskName("mock task") + .setMemorySize(this.memorySize) + .setInputSplitProvider(this.inputSplitProvider) + .setBufferSize(bufferSize) + .build(); } public IteratorWrappingTestSingleInputGate addInput(MutableObjectIterator input, int groupId) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java index 18c8ac5e5fcf0..84a45d8b070b2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; @@ -66,12 +66,11 @@ private void testFormatLifecycle(final boolean midCancel) throws Exception { final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat(); final InputFormatSourceFunction reader = new InputFormatSourceFunction<>(format, TypeInformation.of(Integer.class)); - try (MockEnvironment environment = new MockEnvironment( - "no", - 4 * MemoryManager.DEFAULT_PAGE_SIZE, - null, - 16, - new TestTaskStateManager())) { + try (MockEnvironment environment = + new MockEnvironmentBuilder() + .setTaskName("no") + .setMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE) + .build()) { reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits, environment)); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java index 6d011a3baa1c6..a38ffa6c49abd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.operators; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; @@ -27,7 +26,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; @@ -36,6 +34,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider; @@ -148,20 +147,18 @@ private void testOperatorStatesSnapshotRestoreInternal(final int mode) throws Ex LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(mode != ONLY_JM_RECOVERY, directoryProvider); - MockEnvironment mockEnvironment = new MockEnvironment( - jobID, - jobVertexID, - "test", - 1024L * 1024L, - new MockInputSplitProvider(), - 1024 * 1024, - new Configuration(), - new ExecutionConfig(), - new TestTaskStateManager(localRecoveryConfig), - MAX_PARALLELISM, - 1, - subtaskIdx, - getClass().getClassLoader()); + MockEnvironment mockEnvironment = new MockEnvironmentBuilder() + .setJobID(jobID) + .setJobVertexID(jobVertexID) + .setTaskName("test") + .setMemorySize(1024L * 1024L) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(1024 * 1024) + .setTaskStateManager(new TestTaskStateManager(localRecoveryConfig)) + .setMaxParallelism(MAX_PARALLELISM) + .setSubtaskIndex(subtaskIdx) + .setUserCodeClassLoader(getClass().getClassLoader()) + .build(); KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness<>( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 35e2fbde0aa18..cdd77d3e92f1b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.datastream.AsyncDataStream; @@ -650,12 +651,12 @@ public void testAsyncTimeout() throws Exception { @Nonnull private MockEnvironment createMockEnvironment() { - return new MockEnvironment( - "foobarTask", - 1024 * 1024L, - new MockInputSplitProvider(), - 4 * 1024, - new TestTaskStateManager()); + return new MockEnvironmentBuilder() + .setTaskName("foobarTask") + .setMemorySize(1024 * 1024L) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(4 * 1024) + .build(); } /** diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java index e980ab7e10bbe..fd6a95310fcec 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamOperatorChainingTest.java @@ -25,8 +25,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SplitStream; @@ -170,12 +170,12 @@ public void invoke(String value) throws Exception { } private MockEnvironment createMockEnvironment(String taskName) { - return new MockEnvironment( - taskName, - 3 * 1024 * 1024, - new MockInputSplitProvider(), - 1024, - new TestTaskStateManager()); + return new MockEnvironmentBuilder() + .setTaskName(taskName) + .setMemorySize(3 * 1024 * 1024) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(1024) + .build(); } @Test diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 73a575e524955..34d23958911e4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -62,6 +62,7 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; @@ -328,7 +329,7 @@ public void testFailingCheckpointStreamOperator() throws Exception { TaskInfo mockTaskInfo = mock(TaskInfo.class); when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar"); when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0); - Environment mockEnvironment = new MockEnvironment(); + Environment mockEnvironment = new MockEnvironmentBuilder().build(); StreamTask streamTask = new EmptyStreamTask(mockEnvironment); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); @@ -398,7 +399,7 @@ public void testFailingAsyncCheckpointRunnable() throws Exception { final long checkpointId = 42L; final long timestamp = 1L; - MockEnvironment mockEnvironment = new MockEnvironment(); + MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build(); StreamTask streamTask = spy(new EmptyStreamTask(mockEnvironment)); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); @@ -501,12 +502,10 @@ public Object answer(InvocationOnMock invocation) throws Throwable { null, checkpointResponder); - MockEnvironment mockEnvironment = new MockEnvironment( - "mock-task", - 1024 * MemoryManager.DEFAULT_PAGE_SIZE, - null, - 16, - taskStateManager); + MockEnvironment mockEnvironment = new MockEnvironmentBuilder() + .setTaskName("mock-task") + .setTaskStateManager(taskStateManager) + .build(); StreamTask streamTask = new EmptyStreamTask(mockEnvironment); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); @@ -599,7 +598,7 @@ public void testAsyncCheckpointingConcurrentCloseBeforeAcknowledge() throws Exce final OneShotLatch createSubtask = new OneShotLatch(); final OneShotLatch completeSubtask = new OneShotLatch(); - Environment mockEnvironment = spy(new MockEnvironment()); + Environment mockEnvironment = spy(new MockEnvironmentBuilder().build()); whenNew(OperatorSnapshotFinalizer.class). withAnyArguments(). @@ -685,7 +684,7 @@ public void testEmptySubtaskStateLeadsToStatelessAcknowledgment() throws Excepti final long checkpointId = 42L; final long timestamp = 1L; - Environment mockEnvironment = spy(new MockEnvironment()); + Environment mockEnvironment = spy(new MockEnvironmentBuilder().build()); // latch blocks until the async checkpoint thread acknowledges final OneShotLatch checkpointCompletedLatch = new OneShotLatch(); @@ -765,14 +764,14 @@ public void testOperatorClosingBeforeStopRunning() throws Throwable { streamConfig.setStreamOperator(new BlockingCloseStreamOperator()); streamConfig.setOperatorID(new OperatorID()); - try (MockEnvironment mockEnvironment = new MockEnvironment( - "Test Task", - 32L * 1024L, - new MockInputSplitProvider(), - 1, - taskConfiguration, - new ExecutionConfig(), - new TestTaskStateManager())) { + try (MockEnvironment mockEnvironment = + new MockEnvironmentBuilder() + .setTaskName("Test Task") + .setMemorySize(32L * 1024L) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(1) + .setTaskConfiguration(taskConfiguration) + .build()) { StreamTask streamTask = new NoOpStreamTask<>(mockEnvironment); final AtomicReference atomicThrowable = new AtomicReference<>(null); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index ed2da18d1b6a4..26ad3ab7df35a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; @@ -137,17 +138,15 @@ public AbstractStreamOperatorTestHarness( int subtaskIndex) throws Exception { this( operator, - new MockEnvironment( - "MockTask", - 3 * 1024 * 1024, - new MockInputSplitProvider(), - 1024, - new Configuration(), - new ExecutionConfig(), - new TestTaskStateManager(), - maxParallelism, - parallelism, - subtaskIndex), + new MockEnvironmentBuilder() + .setTaskName("MockTask") + .setMemorySize(3 * 1024 * 1024) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(1024) + .setMaxParallelism(maxParallelism) + .setParallelism(parallelism) + .setSubtaskIndex(subtaskIndex) + .build(), true); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java index 3f54081a242d2..660a333716a7d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -51,12 +51,13 @@ public static List runSourceFunction(SourceFunction< } private static List runRichSourceFunction(SourceFunction sourceFunction) throws Exception { - try (MockEnvironment environment = new MockEnvironment( - "MockTask", - 3 * 1024 * 1024, - new MockInputSplitProvider(), - 1024, - new TestTaskStateManager())) { + try (MockEnvironment environment = + new MockEnvironmentBuilder() + .setTaskName("MockTask") + .setMemorySize(3 * 1024 * 1024) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(1024) + .build()) { AbstractStreamOperator operator = mock(AbstractStreamOperator.class); when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java index aaa96fbaaad86..fe5678282f103 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java @@ -37,12 +37,12 @@ import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackendLoader; -import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamMap; @@ -351,18 +351,17 @@ private OperatorSubtaskState runOperator( OperatorSubtaskState operatorSubtaskState, Iterable input) throws Exception { - try (final MockEnvironment environment = new MockEnvironment( - "test task", - 32 * 1024, - new MockInputSplitProvider(), - 256, - taskConfiguration, - executionConfig, - new TestTaskStateManager(), - 16, - 1, - 0, - classLoader)) { + try (final MockEnvironment environment = + new MockEnvironmentBuilder() + .setTaskName("test task") + .setMemorySize(32 * 1024) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(256) + .setTaskConfiguration(taskConfiguration) + .setExecutionConfig(executionConfig) + .setMaxParallelism(16) + .setUserCodeClassLoader(classLoader) + .build()) { OneInputStreamOperatorTestHarness harness = null; try { From 5e930ca4369e5caaa300db004efef2248b5540a0 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Wed, 9 May 2018 11:29:18 +0200 Subject: [PATCH 2/4] [hotfix][tests] Reduce mockito usage in tests --- .../StreamingRuntimeContextTest.java | 10 +++--- .../async/AsyncWaitOperatorTest.java | 9 ++--- .../streaming/util/MockStreamConfig.java | 33 +++++++++++++++++++ 3 files changed, 42 insertions(+), 10 deletions(-) create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index 1072eec10973d..87667b2fe3104 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.DefaultKeyedStateStore; @@ -58,7 +59,6 @@ import java.util.Collections; import java.util.Map; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertFalse; @@ -373,10 +373,8 @@ public MapState answer(InvocationOnMock invocationOnMock) throw } private static Environment createMockEnvironment() { - Environment env = mock(Environment.class); - when(env.getUserClassLoader()).thenReturn(StreamingRuntimeContextTest.class.getClassLoader()); - when(env.getDistributedCacheEntries()).thenReturn(Collections.>emptyMap()); - when(env.getTaskInfo()).thenReturn(new TaskInfo("test task", 1, 0, 1, 1)); - return env; + return MockEnvironment.builder() + .setTaskName("test task") + .build(); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index cdd77d3e92f1b..17d654e637add 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -58,6 +58,7 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.util.MockStreamConfig; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.ExceptionUtils; @@ -680,8 +681,8 @@ public void testClosingWithBlockedEmitter() throws Exception { when(containingTask.getCheckpointLock()).thenReturn(lock); when(containingTask.getProcessingTimeService()).thenReturn(new TestProcessingTimeService()); - StreamConfig streamConfig = mock(StreamConfig.class); - doReturn(IntSerializer.INSTANCE).when(streamConfig).getTypeSerializerIn1(any(ClassLoader.class)); + StreamConfig streamConfig = new MockStreamConfig(); + streamConfig.setTypeSerializerIn1(IntSerializer.INSTANCE); final OneShotLatch closingLatch = new OneShotLatch(); final OneShotLatch outputLatch = new OneShotLatch(); @@ -783,8 +784,8 @@ public void testTimeoutCleanup() throws Exception { when(containingTask.getCheckpointLock()).thenReturn(lock); when(containingTask.getProcessingTimeService()).thenReturn(processingTimeService); - StreamConfig streamConfig = mock(StreamConfig.class); - doReturn(IntSerializer.INSTANCE).when(streamConfig).getTypeSerializerIn1(any(ClassLoader.class)); + StreamConfig streamConfig = new MockStreamConfig(); + streamConfig.setTypeSerializerIn1(IntSerializer.INSTANCE); Output> output = mock(Output.class); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java new file mode 100644 index 0000000000000..598da48db27df --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.streaming.api.graph.StreamConfig; + +/** + * Handy mock for {@link StreamConfig}. + */ +public class MockStreamConfig extends StreamConfig { + public MockStreamConfig() { + super(new Configuration()); + + setOperatorID(new OperatorID()); + } +} From 14e06394d384956250d0df1559176470f8c10628 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Tue, 8 May 2018 17:46:29 +0200 Subject: [PATCH 3/4] [FLINK-9316][streaming] Expose operator's unique ID in DataStream programs This allows to uniquely and stably across multiple job submissions identify operators. Previously two different operators that were executed by tasks that had the same name were indistinguishable. --- .../kafka/FlinkKafkaConsumerBaseTest.java | 6 ++ .../kinesis/testutils/TestRuntimeContext.java | 6 ++ flink-contrib/flink-storm/pom.xml | 8 ++ .../flink/storm/wrappers/BoltWrapperTest.java | 18 ++--- .../operators/StreamingRuntimeContext.java | 15 ++++ .../source/InputFormatSourceFunctionTest.java | 6 ++ .../operators/GetOperatorUniqueIDTest.java | 75 +++++++++++++++++++ .../StreamingRuntimeContextTest.java | 4 + 8 files changed, 129 insertions(+), 9 deletions(-) create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index 46050153a1d0c..c9b52415a3e96 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -32,6 +32,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -873,6 +874,11 @@ private static class MockStreamOperator extends AbstractStreamOperator public ExecutionConfig getExecutionConfig() { return new ExecutionConfig(); } + + @Override + public OperatorID getOperatorID() { + return new OperatorID(); + } } } diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java index 740d2f2ed6066..9a3ad728296ef 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/TestRuntimeContext.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -83,5 +84,10 @@ private static class TestStreamOperator extends AbstractStreamOperator public ExecutionConfig getExecutionConfig() { return new ExecutionConfig(); } + + @Override + public OperatorID getOperatorID() { + return new OperatorID(42, 44); + } } } diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml index 496aecd34ae3e..fb52a930fcc44 100644 --- a/flink-contrib/flink-storm/pom.xml +++ b/flink-contrib/flink-storm/pom.xml @@ -180,6 +180,14 @@ under the License. test + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + test + test-jar + + diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index 430e4d8f11338..d405a45f6af48 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -32,12 +32,12 @@ import org.apache.flink.storm.util.SplitStreamType; import org.apache.flink.storm.util.StormConfig; import org.apache.flink.storm.util.TestDummyBolt; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.MockStreamConfig; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -159,7 +159,7 @@ private void testWrapper(final int numberOfAttributes) throws Exception { PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) null); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class)); wrapper.open(); wrapper.processElement(record); @@ -195,7 +195,7 @@ public void testMultipleOutputStreams() throws Exception { } final BoltWrapper wrapper = new BoltWrapper(bolt, null, raw); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), output); + wrapper.setup(createMockStreamTask(), new MockStreamConfig(), output); wrapper.open(); final SplitStreamType splitRecord = new SplitStreamType(); @@ -248,7 +248,7 @@ public void testOpen() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper wrapper = new BoltWrapper(bolt); - wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class)); wrapper.open(); verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class)); @@ -261,7 +261,7 @@ public void testOpen() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper wrapper = new BoltWrapper(bolt); - wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class)); wrapper.open(); verify(bolt).prepare(same(stormConfig), any(TopologyContext.class), any(OutputCollector.class)); @@ -278,7 +278,7 @@ public void testOpen() throws Exception { TestDummyBolt testBolt = new TestDummyBolt(); BoltWrapper wrapper = new BoltWrapper(testBolt); - wrapper.setup(createMockStreamTask(execConfig), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(execConfig), new MockStreamConfig(), mock(Output.class)); wrapper.open(); for (Entry entry : cfg.toMap().entrySet()) { @@ -305,7 +305,7 @@ public void testOpenSink() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); BoltWrapper wrapper = new BoltWrapper(bolt); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class)); wrapper.open(); verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNotNull(OutputCollector.class)); @@ -322,7 +322,7 @@ public void testClose() throws Exception { final BoltWrapper wrapper = new BoltWrapper(bolt); - wrapper.setup(createMockStreamTask(), new StreamConfig(new Configuration()), mock(Output.class)); + wrapper.setup(createMockStreamTask(), new MockStreamConfig(), mock(Output.class)); wrapper.close(); wrapper.dispose(); @@ -379,7 +379,7 @@ public Map getComponentConfiguration() { final CloseableRegistry closeableRegistry = new CloseableRegistry(); StreamTask mockTask = mock(StreamTask.class); when(mockTask.getCheckpointLock()).thenReturn(new Object()); - when(mockTask.getConfiguration()).thenReturn(new StreamConfig(new Configuration())); + when(mockTask.getConfiguration()).thenReturn(new MockStreamConfig()); when(mockTask.getEnvironment()).thenReturn(env); when(mockTask.getExecutionConfig()).thenReturn(execConfig); when(mockTask.getCancelables()).thenReturn(closeableRegistry); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 1f42ccffcdc0b..89c038fbad574 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -61,6 +61,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { private final StreamConfig streamConfig; + private final String operatorUniqueID; + public StreamingRuntimeContext(AbstractStreamOperator operator, Environment env, Map> accumulators) { super(env.getTaskInfo(), @@ -73,6 +75,7 @@ public StreamingRuntimeContext(AbstractStreamOperator operator, this.operator = operator; this.taskEnvironment = env; this.streamConfig = new StreamConfig(env.getTaskConfiguration()); + this.operatorUniqueID = operator.getOperatorID().toString(); } // ------------------------------------------------------------------------ @@ -90,6 +93,18 @@ public ProcessingTimeService getProcessingTimeService() { return operator.getProcessingTimeService(); } + /** + * Returned value is guaranteed to be unique between operators within the same job and to be + * stable and the same across job submissions. + * + *

This operation is currently only supported in Streaming (DataStream) contexts. + * + * @return String representation of the operator's unique id. + */ + public String getOperatorUniqueID() { + return operatorUniqueID; + } + // ------------------------------------------------------------------------ // broadcast variables // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java index 84a45d8b070b2..cad3df8bbafc3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunctionTest.java @@ -29,6 +29,7 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockEnvironment; @@ -308,6 +309,11 @@ private static class MockStreamOperator extends AbstractStreamOperator public ExecutionConfig getExecutionConfig() { return new ExecutionConfig(); } + + @Override + public OperatorID getOperatorID() { + return new OperatorID(); + } } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java new file mode 100644 index 0000000000000..9693e42c6f665 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/GetOperatorUniqueIDTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; + +/** + * Tests the uid translation to {@link org.apache.flink.runtime.jobgraph.OperatorID}. + */ +@SuppressWarnings("serial") +public class GetOperatorUniqueIDTest extends TestLogger { + + /** + * If expected values ever change double check that the change is not braking the contract of + * {@link StreamingRuntimeContext#getOperatorUniqueID()} being stable between job submissions. + */ + @Test + public void testGetOperatorUniqueID() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + + env.fromElements(1, 2, 3) + .map(new VerifyOperatorIDMapFunction("6c4f323f22da8fb6e34f80c61be7a689")).uid("42") + .map(new VerifyOperatorIDMapFunction("3e129e83691e7737fbf876b47452acbc")).uid("44"); + + env.execute(); + } + + private static class VerifyOperatorIDMapFunction extends AbstractRichFunction implements MapFunction { + private static final long serialVersionUID = 6584823409744624276L; + + private final String expectedOperatorUniqueID; + + public VerifyOperatorIDMapFunction(String expectedOperatorUniqueID) { + this.expectedOperatorUniqueID = checkNotNull(expectedOperatorUniqueID); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + assertEquals(expectedOperatorUniqueID, ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID()); + } + + @Override + public Integer map(Integer value) throws Exception { + return value; + } + + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index 87667b2fe3104..e04ceddbe598c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -41,6 +41,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.query.KvStateRegistry; @@ -296,6 +297,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { }).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(StateDescriptor.class)); when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore); + when(operatorMock.getOperatorID()).thenReturn(new OperatorID()); return operatorMock; } @@ -333,6 +335,7 @@ public ListState answer(InvocationOnMock invocationOnMock) throws Throwa }).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(ListStateDescriptor.class)); when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore); + when(operatorMock.getOperatorID()).thenReturn(new OperatorID()); return operatorMock; } @@ -369,6 +372,7 @@ public MapState answer(InvocationOnMock invocationOnMock) throw }).when(keyedStateBackend).getPartitionedState(Matchers.any(), any(TypeSerializer.class), any(MapStateDescriptor.class)); when(operatorMock.getKeyedStateStore()).thenReturn(keyedStateStore); + when(operatorMock.getOperatorID()).thenReturn(new OperatorID()); return operatorMock; } From ac4b326c2aa7e7c719924bd996a730fc3426c281 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Tue, 8 May 2018 17:49:31 +0200 Subject: [PATCH 4/4] [FLINK-9295][kafka] Fix transactional.id collisions for FlinkKafkaProducer011 Previously if there were two completely independent FlinkKafkaProducer011 data sinks in the job graph, their transactional.id would collide with one another. Fix is to use operator's unique ID as well along task name and subtask id. This change is backward compatible for recovering from older savepoints, since transactional.ids generated by the old generator still will be used after restoring from state. --- .../kafka/FlinkKafkaProducer011.java | 2 +- .../kafka/FlinkKafkaProducer011ITCase.java | 4 +- .../Kafka011ProducerExactlyOnceITCase.java | 6 ++ .../kafka/KafkaProducerTestBase.java | 55 ++++++++++--------- .../AbstractStreamOperatorTestHarness.java | 31 +++++++++-- .../OneInputStreamOperatorTestHarness.java | 17 +++++- 6 files changed, 81 insertions(+), 34 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index 0ae5e03bc6718..84973726113c1 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -837,7 +837,7 @@ public void initializeState(FunctionInitializationContext context) throws Except nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState( NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR); transactionalIdsGenerator = new TransactionalIdsGenerator( - getRuntimeContext().getTaskName(), + getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(), getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), kafkaProducersPoolSize, diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java index 36cb36214bb87..74c58ad289192 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -627,7 +628,8 @@ private OneInputStreamOperatorTestHarness createTestHarness( maxParallelism, parallelism, subtaskIndex, - IntSerializer.INSTANCE); + IntSerializer.INSTANCE, + new OperatorID(42, 44)); } private Properties createProperties() { diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java index 116723869f429..5038b7f58f415 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ProducerExactlyOnceITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.junit.BeforeClass; +import org.junit.Test; /** * IT cases for the {@link FlinkKafkaProducer011}. @@ -48,4 +49,9 @@ public void testOneToOneAtLeastOnceCustomOperator() throws Exception { // that it doesn't work well with some weird network failures, or the NetworkFailureProxy is a broken design // and this test should be reimplemented in completely different way... } + + @Test + public void testMultipleSinkOperators() throws Exception { + testExactlyOnce(false, 2); + } } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index 5023a7eae719b..0807eb4659257 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -303,7 +303,7 @@ public int partition(Integer record, byte[] key, byte[] value, String targetTopi */ @Test public void testExactlyOnceRegularSink() throws Exception { - testExactlyOnce(true); + testExactlyOnce(true, 1); } /** @@ -311,20 +311,22 @@ public void testExactlyOnceRegularSink() throws Exception { */ @Test public void testExactlyOnceCustomOperator() throws Exception { - testExactlyOnce(false); + testExactlyOnce(false, 1); } /** * This test sets KafkaProducer so that it will automatically flush the data and * and fails the broker to check whether flushed records since last checkpoint were not duplicated. */ - protected void testExactlyOnce(boolean regularSink) throws Exception { - final String topic = regularSink ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator"; + protected void testExactlyOnce(boolean regularSink, int sinksCount) throws Exception { + final String topic = (regularSink ? "exactlyOnceTopicRegularSink" : "exactlyTopicCustomOperator") + sinksCount; final int partition = 0; final int numElements = 1000; final int failAfterElements = 333; - createTestTopic(topic, 1, 1); + for (int i = 0; i < sinksCount; i++) { + createTestTopic(topic + i, 1, 1); + } TypeInformationSerializationSchema schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); KeyedSerializationSchema keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema); @@ -346,32 +348,35 @@ protected void testExactlyOnce(boolean regularSink) throws Exception { .addSource(new IntegerSource(numElements)) .map(new FailingIdentityMapper(failAfterElements)); - FlinkKafkaPartitioner partitioner = new FlinkKafkaPartitioner() { - @Override - public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { - return partition; + for (int i = 0; i < sinksCount; i++) { + FlinkKafkaPartitioner partitioner = new FlinkKafkaPartitioner() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }; + + if (regularSink) { + StreamSink kafkaSink = kafkaServer.getProducerSink(topic + i, keyedSerializationSchema, properties, partitioner); + inputStream.addSink(kafkaSink.getUserFunction()); + } else { + kafkaServer.produceIntoKafka(inputStream, topic + i, keyedSerializationSchema, properties, partitioner); } - }; - if (regularSink) { - StreamSink kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, partitioner); - inputStream.addSink(kafkaSink.getUserFunction()); - } - else { - kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, partitioner); } FailingIdentityMapper.failedBefore = false; TestUtils.tryExecute(env, "Exactly once test"); - // assert that before failure we successfully snapshot/flushed all expected elements - assertExactlyOnceForTopic( - properties, - topic, - partition, - expectedElements, - KAFKA_READ_TIMEOUT); - - deleteTestTopic(topic); + for (int i = 0; i < sinksCount; i++) { + // assert that before failure we successfully snapshot/flushed all expected elements + assertExactlyOnceForTopic( + properties, + topic + i, + partition, + expectedElements, + KAFKA_READ_TIMEOUT); + deleteTestTopic(topic + i); + } } private List getIntegersSequence(int size) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 26ad3ab7df35a..0c4ecc01e8068 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -147,19 +147,42 @@ public AbstractStreamOperatorTestHarness( .setParallelism(parallelism) .setSubtaskIndex(subtaskIndex) .build(), - true); + true, + new OperatorID()); + } + + public AbstractStreamOperatorTestHarness( + StreamOperator operator, + int maxParallelism, + int parallelism, + int subtaskIndex, + OperatorID operatorID) throws Exception { + this( + operator, + new MockEnvironmentBuilder() + .setTaskName("MockTask") + .setMemorySize(3 * 1024 * 1024) + .setInputSplitProvider(new MockInputSplitProvider()) + .setBufferSize(1024) + .setMaxParallelism(maxParallelism) + .setParallelism(parallelism) + .setSubtaskIndex(subtaskIndex) + .build(), + true, + operatorID); } public AbstractStreamOperatorTestHarness( StreamOperator operator, MockEnvironment env) throws Exception { - this(operator, env, false); + this(operator, env, false, new OperatorID()); } private AbstractStreamOperatorTestHarness( StreamOperator operator, MockEnvironment env, - boolean environmentIsInternal) throws Exception { + boolean environmentIsInternal, + OperatorID operatorID) throws Exception { this.operator = operator; this.outputList = new ConcurrentLinkedQueue<>(); this.sideOutputLists = new HashMap<>(); @@ -167,7 +190,7 @@ private AbstractStreamOperatorTestHarness( Configuration underlyingConfig = env.getTaskConfiguration(); this.config = new StreamConfig(underlyingConfig); this.config.setCheckpointingEnabled(true); - this.config.setOperatorID(new OperatorID()); + this.config.setOperatorID(operatorID); this.executionConfig = env.getExecutionConfig(); this.closableRegistry = new CloseableRegistry(); this.checkpointLock = new Object(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 66d2f69531514..01551984ed103 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.util; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -54,8 +55,9 @@ public OneInputStreamOperatorTestHarness( int maxParallelism, int parallelism, int subtaskIndex, - TypeSerializer typeSerializerIn) throws Exception { - this(operator, maxParallelism, parallelism, subtaskIndex); + TypeSerializer typeSerializerIn, + OperatorID operatorID) throws Exception { + this(operator, maxParallelism, parallelism, subtaskIndex, operatorID); config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn)); } @@ -78,7 +80,16 @@ public OneInputStreamOperatorTestHarness( int maxParallelism, int parallelism, int subtaskIndex) throws Exception { - super(operator, maxParallelism, parallelism, subtaskIndex); + this(operator, maxParallelism, parallelism, subtaskIndex, new OperatorID()); + } + + public OneInputStreamOperatorTestHarness( + OneInputStreamOperator operator, + int maxParallelism, + int parallelism, + int subtaskIndex, + OperatorID operatorID) throws Exception { + super(operator, maxParallelism, parallelism, subtaskIndex, operatorID); this.oneInputOperator = operator; }