diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index 279aed0d0e9aa..69e967d6d3f89 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -25,6 +25,7 @@ import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.Configuration; @@ -332,14 +333,10 @@ public Map getComponentConfiguration() { public static StreamTask createMockStreamTask(ExecutionConfig execConfig) { Environment env = mock(Environment.class); - when(env.getTaskName()).thenReturn("Mock Task"); - when(env.getTaskNameWithSubtasks()).thenReturn("Mock Task (1/1)"); - when(env.getIndexInSubtaskGroup()).thenReturn(0); - when(env.getNumberOfSubtasks()).thenReturn(1); + when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 0, 1, 0)); when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader()); StreamTask mockTask = mock(StreamTask.class); - when(mockTask.getName()).thenReturn("Mock Task (1/1)"); when(mockTask.getCheckpointLock()).thenReturn(new Object()); when(mockTask.getConfiguration()).thenReturn(new StreamConfig(new Configuration())); when(mockTask.getEnvironment()).thenReturn(env); diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java index 82cf90970f6f9..0cee8ae8bd57a 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/DbStateBackend.java @@ -181,7 +181,7 @@ public LazyDbKvState createKvState(String stateId, String stateName TypeSerializer keySerializer, TypeSerializer valueSerializer, V defaultValue) throws IOException { return new LazyDbKvState( stateId + "_" + env.getJobID().toShortString(), - env.getIndexInSubtaskGroup() == 0, + env.getTaskInfo().getIndexOfThisSubtask() == 0, getConnections(), getConfiguration(), keySerializer, diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java index 3d7abff1c9ffd..aceb3c02a7150 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/state/LazyDbKvState.java @@ -376,7 +376,7 @@ public Void call() throws Exception { }, stateBackend.getConfiguration().getMaxNumberOfSqlRetries(), stateBackend.getConfiguration().getSleepBetweenSqlRetries()); - boolean cleanup = stateBackend.getEnvironment().getIndexInSubtaskGroup() == 0; + boolean cleanup = stateBackend.getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0; // Restore the KvState LazyDbKvState restored = new LazyDbKvState(kvStateId, cleanup, diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java new file mode 100644 index 0000000000000..d7cfb9577d4cc --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Encapsulates task-specific information: name, index of subtask, parallelism and attempt number. + */ +public class TaskInfo { + + private final String taskName; + private final String taskNameWithSubtasks; + private final int indexOfSubtask; + private final int numberOfParallelSubtasks; + private final int attemptNumber; + + public TaskInfo(String taskName, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) { + checkArgument(indexOfSubtask >= 0, "Task index must be a non-negative number."); + checkArgument(numberOfParallelSubtasks >= 1, "Parallelism must be a positive number."); + checkArgument(indexOfSubtask < numberOfParallelSubtasks, "Task index must be less than parallelism."); + checkArgument(attemptNumber >= 0, "Attempt number must be a non-negative number."); + this.taskName = checkNotNull(taskName, "Task Name must not be null."); + this.indexOfSubtask = indexOfSubtask; + this.numberOfParallelSubtasks = numberOfParallelSubtasks; + this.attemptNumber = attemptNumber; + this.taskNameWithSubtasks = taskName + " (" + (indexOfSubtask + 1) + '/' + numberOfParallelSubtasks + ')'; + } + + /** + * Returns the name of the task + * + * @return The name of the task + */ + public String getTaskName() { + return this.taskName; + } + + /** + * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to + * parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}). + * + * @return The index of the parallel subtask. + */ + public int getIndexOfThisSubtask() { + return this.indexOfSubtask; + } + + /** + * Gets the parallelism with which the parallel task runs. + * + * @return The parallelism with which the parallel task runs. + */ + public int getNumberOfParallelSubtasks() { + return this.numberOfParallelSubtasks; + } + + /** + * Gets the attempt number of this parallel subtask. First attempt is numbered 0. + * The attempt number corresponds to the number of times this task has been restarted(after + * failure/cancellation) since the job was initially started. + * + * @return Attempt number of the subtask. + */ + public int getAttemptNumber() { + return this.attemptNumber; + } + + /** + * Returns the name of the task, appended with the subtask indicator, such as "MyTask (3/6)", + * where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be + * {@link #getNumberOfParallelSubtasks()}. + * + * @return The name of the task, with subtask indicator. + */ + public String getTaskNameWithSubtasks() { + return this.taskNameWithSubtasks; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java index 7f767c37f60f5..da79a795a07ff 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java @@ -64,6 +64,22 @@ public interface RuntimeContext { */ int getIndexOfThisSubtask(); + /** + * Gets the attempt number of this parallel subtask. First attempt is numbered 0. + * + * @return Attempt number of the subtask. + */ + int getAttemptNumber(); + + /** + * Returns the name of the task, appended with the subtask indicator, such as "MyTask (3/6)", + * where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be + * {@link #getNumberOfParallelSubtasks()}. + * + * @return The name of the task, with subtask indicator. + */ + String getTaskNameWithSubtasks(); + /** * Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing * job. diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java index 34ec0b508c5e8..8f1b6b1429b92 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.accumulators.DoubleCounter; @@ -42,11 +43,7 @@ */ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { - private final String name; - - private final int numParallelSubtasks; - - private final int subtaskIndex; + private final TaskInfo taskInfo; private final ClassLoader userCodeClassLoader; @@ -56,15 +53,12 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext { private final DistributedCache distributedCache; - public AbstractRuntimeUDFContext(String name, - int numParallelSubtasks, int subtaskIndex, + public AbstractRuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map> accumulators, Map> cpTasks) { - this.name = name; - this.numParallelSubtasks = numParallelSubtasks; - this.subtaskIndex = subtaskIndex; + this.taskInfo = Preconditions.checkNotNull(taskInfo); this.userCodeClassLoader = userCodeClassLoader; this.executionConfig = executionConfig; this.distributedCache = new DistributedCache(Preconditions.checkNotNull(cpTasks)); @@ -78,17 +72,27 @@ public ExecutionConfig getExecutionConfig() { @Override public String getTaskName() { - return this.name; + return taskInfo.getTaskName(); } @Override public int getNumberOfParallelSubtasks() { - return this.numParallelSubtasks; + return taskInfo.getNumberOfParallelSubtasks(); } @Override public int getIndexOfThisSubtask() { - return this.subtaskIndex; + return taskInfo.getIndexOfThisSubtask(); + } + + @Override + public int getAttemptNumber() { + return taskInfo.getAttemptNumber(); + } + + @Override + public String getTaskNameWithSubtasks() { + return taskInfo.getTaskNameWithSubtasks(); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java index c582768370c3b..5558c21c0b353 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/RuntimeUDFContext.java @@ -24,6 +24,7 @@ import java.util.concurrent.Future; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.RuntimeContext; @@ -38,9 +39,9 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext { private final HashMap> uninitializedBroadcastVars = new HashMap>(); - public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, - ExecutionConfig executionConfig, Map> cpTasks, Map> accumulators) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks); + public RuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, + Map> cpTasks, Map> accumulators) { + super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks); } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java index b6d8128c647eb..719414ffcfb88 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java @@ -36,6 +36,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.common.aggregators.Aggregator; @@ -179,10 +180,11 @@ private void executeDataSink(GenericDataSinkBase sink, int superStep) th GenericDataSinkBase typedSink = (GenericDataSinkBase) sink; // build the runtime context and compute broadcast variables, if necessary + TaskInfo taskInfo = new TaskInfo(typedSink.getName(), 0, 1, 0); RuntimeUDFContext ctx; if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(typedSink.getName(), 1, 0, getClass().getClassLoader(), executionConfig, cachedFiles, accumulators) : - new IterationRuntimeUDFContext(typedSink.getName(), 1, 0, classLoader, executionConfig, cachedFiles, accumulators); + ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) : + new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators); } else { ctx = null; } @@ -195,10 +197,11 @@ private List executeDataSource(GenericDataSourceBase source, in @SuppressWarnings("unchecked") GenericDataSourceBase typedSource = (GenericDataSourceBase) source; // build the runtime context and compute broadcast variables, if necessary + TaskInfo taskInfo = new TaskInfo(typedSource.getName(), 0, 1, 0); RuntimeUDFContext ctx; if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(source.getName(), 1, 0, getClass().getClassLoader(), executionConfig, cachedFiles, accumulators) : - new IterationRuntimeUDFContext(source.getName(), 1, 0, classLoader, executionConfig, cachedFiles, accumulators); + ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) : + new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators); } else { ctx = null; } @@ -218,12 +221,11 @@ private List executeUnaryOperator(SingleInputOperator op SingleInputOperator typedOp = (SingleInputOperator) operator; // build the runtime context and compute broadcast variables, if necessary + TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0); RuntimeUDFContext ctx; if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass() - .getClassLoader(), executionConfig, cachedFiles, accumulators) : - new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, - executionConfig, cachedFiles, accumulators); + ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) : + new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators); for (Map.Entry> bcInputs : operator.getBroadcastInputs().entrySet()) { List bcData = execute(bcInputs.getValue()); @@ -259,12 +261,11 @@ private List executeBinaryOperator(DualInputOperator typedOp = (DualInputOperator) operator; // build the runtime context and compute broadcast variables, if necessary + TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0); RuntimeUDFContext ctx; if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) { - ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader, - executionConfig, cachedFiles, accumulators) : - new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader, - executionConfig, cachedFiles, accumulators); + ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) : + new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators); for (Map.Entry> bcInputs : operator.getBroadcastInputs().entrySet()) { List bcData = execute(bcInputs.getValue()); @@ -519,10 +520,9 @@ else if (op instanceof GenericDataSourceBase) { private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext { - public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader, - ExecutionConfig executionConfig, Map> cpTasks, Map> accumulators) { - super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig, cpTasks, accumulators); + public IterationRuntimeUDFContext(TaskInfo taskInfo, ClassLoader classloader, ExecutionConfig executionConfig, + Map> cpTasks, Map> accumulators) { + super(taskInfo, classloader, executionConfig, cpTasks, accumulators); } @Override diff --git a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java index 5e8f891b3b7eb..858bc49403b15 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.Future; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.core.fs.Path; @@ -34,11 +35,13 @@ public class RuntimeUDFContextTest { - + + private final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0); + @Test public void testBroadcastVariableNotFound() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(),new HashMap>()); + RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(),new HashMap>()); try { ctx.getBroadcastVariable("some name"); @@ -68,7 +71,7 @@ public void testBroadcastVariableNotFound() { @Test public void testBroadcastVariableSimple() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>()); + RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>()); ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4)); ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0)); @@ -102,7 +105,7 @@ public void testBroadcastVariableSimple() { @Test public void testBroadcastVariableWithInitializer() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>()); + RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); @@ -127,7 +130,7 @@ public void testBroadcastVariableWithInitializer() { @Test public void testResetBroadcastVariableWithInitializer() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>()); + RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); @@ -150,7 +153,7 @@ public void testResetBroadcastVariableWithInitializer() { @Test public void testBroadcastVariableWithInitializerAndMismatch() { try { - RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>()); + RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>()); ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4)); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java index 2a9cef345bc01..7ea007174c1c3 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.Future; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.core.fs.Path; @@ -38,7 +39,8 @@ public class RichInputFormatTest { @Test public void testCheckRuntimeContextAccess() { final SerializedInputFormat inputFormat = new SerializedInputFormat(); - inputFormat.setRuntimeContext(new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>())); + final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0); + inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>())); Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1); Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java index 10d03ab21e811..273f4f54e7389 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java @@ -20,9 +20,11 @@ package org.apache.flink.api.common.io; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Future; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.core.fs.Path; @@ -38,7 +40,8 @@ public class RichOutputFormatTest { @Test public void testCheckRuntimeContextAccess() { final SerializedOutputFormat inputFormat = new SerializedOutputFormat(); - inputFormat.setRuntimeContext(new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>())); + final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0); + inputFormat.setRuntimeContext(new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), new HashMap>())); Assert.assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1); Assert.assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java index 9a4630545eb7d..5ca4c4cfe2d85 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.api.common.operators.util.TestIOData; @@ -90,15 +91,16 @@ public void testDataSourceWithRuntimeContext() { ExecutionConfig executionConfig = new ExecutionConfig(); final HashMap> accumulatorMap = new HashMap>(); final HashMap> cpTasks = new HashMap<>(); + final TaskInfo taskInfo = new TaskInfo("test_sink", 0, 1, 0); executionConfig.disableObjectReuse(); in.reset(); - sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext("test_sink", 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); assertEquals(out.output, asList(TestIOData.RICH_NAMES)); executionConfig.enableObjectReuse(); out.clear(); in.reset(); - sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext("test_sink", 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + sink.executeOnCollections(asList(TestIOData.NAMES), new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); assertEquals(out.output, asList(TestIOData.RICH_NAMES)); } catch(Exception e){ e.printStackTrace(); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java index 50b8d805599d0..bda2fb657dd76 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.operators; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; import org.apache.flink.api.common.operators.util.TestIOData; @@ -76,13 +77,15 @@ public void testDataSourceWithRuntimeContext() { final HashMap> accumulatorMap = new HashMap>(); final HashMap> cpTasks = new HashMap<>(); + final TaskInfo taskInfo = new TaskInfo("test_source", 0, 1, 0); + ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext("test_source", 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + List resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); in.reset(); executionConfig.enableObjectReuse(); - List resultRegular = source.executeOnCollections(new RuntimeUDFContext("test_source", 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + List resultRegular = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); assertEquals(asList(TestIOData.RICH_NAMES), resultMutableSafe); assertEquals(asList(TestIOData.RICH_NAMES), resultRegular); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java index 745cf09c0d586..cda324526fe0c 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; @@ -74,9 +75,10 @@ private void testExecuteOnCollection(FlatMapFunction udf, List result = getTestFlatMapOperator(udf) - .executeOnCollections(input, new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); + .executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); Assert.assertEquals(input.size(), result.size()); Assert.assertEquals(input, result); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java index 04505d0052ba9..d119fe22e6a7b 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.*; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.RichFlatJoinFunction; @@ -118,14 +119,15 @@ public void join(String first, String second, Collector out) throws Exc try { + final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0); final HashMap> accumulatorMap = new HashMap>(); final HashMap> cpTasks = new HashMap<>(); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + List resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); executionConfig.enableObjectReuse(); - List resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + List resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); assertEquals(expected, resultSafe); assertEquals(expected, resultRegular); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java index 0dbe1b607214e..6059ab1e438d5 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -108,11 +109,12 @@ public void close() throws Exception { List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); final HashMap> accumulatorMap = new HashMap>(); final HashMap> cpTasks = new HashMap<>(); + final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); executionConfig.enableObjectReuse(); - List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java index 47f30de889900..71486a51bf1f6 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.core.fs.Path; import org.apache.flink.util.Collector; @@ -80,11 +81,13 @@ public void close() throws Exception { List input = new ArrayList(asList("1", "2", "3", "4", "5", "6")); + final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0); + ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); + List resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); executionConfig.enableObjectReuse(); - List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); + List resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); assertEquals(asList(1, 2, 3, 4, 5, 6), resultMutableSafe); assertEquals(asList(1, 2, 3, 4, 5, 6), resultRegular); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java index 025fcfbfb9a7a..8ce83f3415665 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.RichCoGroupFunction; @@ -74,7 +75,8 @@ public void testExecuteOnCollection() { ExecutionConfig executionConfig = new ExecutionConfig(); final HashMap> accumulators = new HashMap>(); final HashMap> cpTasks = new HashMap<>(); - final RuntimeContext ctx = new RuntimeUDFContext("Test UDF", 4, 0, null, executionConfig, cpTasks, accumulators); + final TaskInfo taskInfo = new TaskInfo("Test UDF", 0, 4, 0); + final RuntimeContext ctx = new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulators); { SumCoGroup udf1 = new SumCoGroup(); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java index 9e1684da9f075..a5632812a8b7f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; @@ -163,11 +164,13 @@ public void close() throws Exception { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); + final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0); + ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); executionConfig.enableObjectReuse(); - List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java index 8bc29d4489db9..a6b3debf83be4 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.*; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.util.RuntimeUDFContext; @@ -103,11 +104,12 @@ public void join(Tuple3 first, Tuple2 )); try { + final TaskInfo taskInfo = new TaskInfo("op", 0, 1, 0); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); + List> resultSafe = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); executionConfig.enableObjectReuse(); - List> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext("op", 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); + List> resultRegular = base.executeOnCollections(inputData1, inputData2, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); assertEquals(expected, new HashSet<>(resultSafe)); assertEquals(expected, new HashSet<>(resultRegular)); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java index 29faf035c322f..c04916d0477ab 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.operators.base; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; @@ -140,11 +141,13 @@ public void close() throws Exception { Integer>("foo", 3), new Tuple2("bar", 2), new Tuple2("bar", 4))); + final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0); + ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); - List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); + List> resultMutableSafe = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); executionConfig.enableObjectReuse(); - List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskName, 1, 0, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); + List> resultRegular = op.executeOnCollections(input, new RuntimeUDFContext(taskInfo, null, executionConfig, new HashMap>(), new HashMap>()), executionConfig); Set> resultSetMutableSafe = new HashSet>(resultMutableSafe); Set> resultSetRegular = new HashSet>(resultRegular); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java index eb7e31103ebb5..8d3d6090cb123 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java @@ -84,10 +84,8 @@ public void materializeVariable(MutableReader reader, TypeSerializerFactory referenceHolder, bool if (!references.remove(referenceHolder)) { if (errorIfNoReference) { throw new IllegalStateException( - String.format("The task %s (%d/%d) did not hold a reference to the broadcast variable %s.", - referenceHolder.getEnvironment().getTaskName(), - referenceHolder.getEnvironment().getIndexInSubtaskGroup() + 1, - referenceHolder.getEnvironment().getNumberOfSubtasks(), + String.format("The task %s did not hold a reference to the broadcast variable %s.", + referenceHolder.getEnvironment().getTaskInfo().getTaskNameWithSubtasks(), key.toString())); } else { return false; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index e6a1583088d26..77fe56a3eb4f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.deployment; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -59,6 +60,9 @@ public final class TaskDeploymentDescriptor implements Serializable { /** The number of sub tasks. */ private final int numberOfSubtasks; + /** Attempt number the task */ + private final int attemptNumber; + /** The configuration of the job the task belongs to. */ private final Configuration jobConfiguration; @@ -91,7 +95,7 @@ public final class TaskDeploymentDescriptor implements Serializable { */ public TaskDeploymentDescriptor( JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId, String taskName, - int indexInSubtaskGroup, int numberOfSubtasks, Configuration jobConfiguration, + int indexInSubtaskGroup, int numberOfSubtasks, int attemptNumber, Configuration jobConfiguration, Configuration taskConfiguration, String invokableClassName, List producedPartitions, List inputGates, @@ -101,6 +105,7 @@ public TaskDeploymentDescriptor( checkArgument(indexInSubtaskGroup >= 0); checkArgument(numberOfSubtasks > indexInSubtaskGroup); checkArgument(targetSlotNumber >= 0); + checkArgument(attemptNumber >= 0); this.jobID = checkNotNull(jobID); this.vertexID = checkNotNull(vertexID); @@ -108,6 +113,7 @@ public TaskDeploymentDescriptor( this.taskName = checkNotNull(taskName); this.indexInSubtaskGroup = indexInSubtaskGroup; this.numberOfSubtasks = numberOfSubtasks; + this.attemptNumber = attemptNumber; this.jobConfiguration = checkNotNull(jobConfiguration); this.taskConfiguration = checkNotNull(taskConfiguration); this.invokableClassName = checkNotNull(invokableClassName); @@ -122,14 +128,14 @@ public TaskDeploymentDescriptor( public TaskDeploymentDescriptor( JobID jobID, JobVertexID vertexID, ExecutionAttemptID executionId, String taskName, - int indexInSubtaskGroup, int numberOfSubtasks, Configuration jobConfiguration, + int indexInSubtaskGroup, int numberOfSubtasks, int attemptNumber, Configuration jobConfiguration, Configuration taskConfiguration, String invokableClassName, List producedPartitions, List inputGates, List requiredJarFiles, List requiredClasspaths, int targetSlotNumber) { - this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, + this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, attemptNumber, jobConfiguration, taskConfiguration, invokableClassName, producedPartitions, inputGates, requiredJarFiles, requiredClasspaths, targetSlotNumber, null, -1); } @@ -175,6 +181,20 @@ public int getNumberOfSubtasks() { return numberOfSubtasks; } + /** + * Returns the attempt number of the subtask + */ + public int getAttemptNumber() { + return attemptNumber; + } + + /** + * Returns the {@link TaskInfo} object for the subtask + */ + public TaskInfo getTaskInfo() { + return new TaskInfo(taskName, indexInSubtaskGroup, numberOfSubtasks, attemptNumber); + } + /** * Gets the number of the slot into which the task is to be deployed. * @@ -224,10 +244,10 @@ public List getRequiredClasspaths() { @Override public String toString() { return String.format("TaskDeploymentDescriptor [job id: %s, job vertex id: %s, " + - "execution id: %s, task name: %s (%d/%d), invokable: %s, " + + "execution id: %s, task name: %s (%d/%d), attempt: %d, invokable: %s, " + "produced partitions: %s, input gates: %s]", jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks, - invokableClassName, collectionToString(producedPartitions), + attemptNumber, invokableClassName, collectionToString(producedPartitions), collectionToString(inputGates)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java index 87d89eb35bde8..73321516ba055 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.execution; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; @@ -87,19 +88,11 @@ public interface Environment { Configuration getJobConfiguration(); /** - * Returns the current number of subtasks the respective task is split into. + * Returns the {@link TaskInfo} object associated with this subtask * - * @return the current number of subtasks the respective task is split into + * @return TaskInfo for this subtask */ - int getNumberOfSubtasks(); - - /** - * Returns the index of this subtask in the subtask group. The index - * is between 0 and {@link #getNumberOfSubtasks()} - 1. - * - * @return the index of this subtask in the subtask group - */ - int getIndexInSubtaskGroup(); + TaskInfo getTaskInfo(); /** * Returns the input split provider assigned to this environment. @@ -123,23 +116,6 @@ public interface Environment { */ MemoryManager getMemoryManager(); - /** - * Returns the name of the task running in this environment. - * - * @return the name of the task running in this environment - */ - String getTaskName(); - - /** - * Returns the name of the task running in this environment, appended - * with the subtask indicator, such as "MyTask (3/6)", where - * 3 would be ({@link #getIndexInSubtaskGroup()} + 1), and 6 would be - * {@link #getNumberOfSubtasks()}. - * - * @return The name of the task running in this environment, with subtask indicator. - */ - String getTaskNameWithSubtasks(); - /** * Returns the user code class loader */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index ce17525fe7e23..eb2e68c8ebad3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -46,7 +46,6 @@ import org.apache.flink.runtime.messages.Messages; import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.ExceptionUtils; @@ -361,8 +360,8 @@ public void deployToSlot(final SimpleSlot slot) throws JobException { LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(), attemptNumber, slot.getInstance().getInstanceConnectionInfo().getHostname())); } - - final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot, operatorState, recoveryTimestamp); + + final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot, operatorState, recoveryTimestamp, attemptNumber); // register this execution at the execution graph, to receive call backs vertex.getExecutionGraph().registerExecution(this); @@ -378,9 +377,7 @@ public void deployToSlot(final SimpleSlot slot) throws JobException { public void onComplete(Throwable failure, Object success) throws Throwable { if (failure != null) { if (failure instanceof TimeoutException) { - String taskname = Task.getTaskNameWithSubtaskAndID(deployment.getTaskName(), - deployment.getIndexInSubtaskGroup(), deployment.getNumberOfSubtasks(), - attemptId); + String taskname = deployment.getTaskInfo().getTaskNameWithSubtasks() + " (" + attemptId + ')'; markFailed(new Exception( "Cannot deploy task " + taskname + " - TaskManager (" + instance diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index c1f68b06ab04f..2be0b3e5b74cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -613,7 +613,8 @@ TaskDeploymentDescriptor createDeploymentDescriptor( ExecutionAttemptID executionId, SimpleSlot targetSlot, SerializedValue> operatorState, - long recoveryTimestamp) { + long recoveryTimestamp, + int attemptNumber) { // Produced intermediate results List producedPartitions = new ArrayList(resultPartitions.size()); @@ -645,7 +646,7 @@ TaskDeploymentDescriptor createDeploymentDescriptor( List classpaths = getExecutionGraph().getRequiredClasspaths(); return new TaskDeploymentDescriptor(getJobId(), getJobvertexId(), executionId, getTaskName(), - subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(), + subTaskIndex, getTotalNumberOfParallelSubtasks(), attemptNumber, getExecutionGraph().getJobConfiguration(), jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(), producedPartitions, consumedPartitions, jarFiles, classpaths, targetSlot.getRoot().getSlotNumber(), operatorState, recoveryTimestamp); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 6e4b5648f0b02..6a919bc145fb1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -343,7 +343,7 @@ public void registerTask(Task task) throws IOException { public void unregisterTask(Task task) { LOG.debug("Unregister task {} from network environment (state: {}).", - task.getTaskNameWithSubtasks(), task.getExecutionState()); + task.getTaskInfo().getTaskNameWithSubtasks(), task.getExecutionState()); final ExecutionAttemptID executionId = task.getExecutionId(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java index 215111bf8526c..434f7d4e4ed96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.iterative.task; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.operators.BatchTask; @@ -168,11 +169,10 @@ protected void closeLocalStrategiesAndCaches() { } @Override - public DistributedRuntimeUDFContext createRuntimeContext(String taskName) { + public DistributedRuntimeUDFContext createRuntimeContext() { Environment env = getEnvironment(); - return new IterativeRuntimeUdfContext(taskName, env.getNumberOfSubtasks(), - env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), - env.getDistributedCacheEntries(), this.accumulatorMap); + return new IterativeRuntimeUdfContext(env.getTaskInfo(), getUserCodeClassLoader(), + getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap); } // -------------------------------------------------------------------------------------------- @@ -195,7 +195,7 @@ public String brokerKey() { if (brokerKey == null) { int iterationId = config.getIterationId(); brokerKey = getEnvironment().getJobID().toString() + '#' + iterationId + '#' + - getEnvironment().getIndexInSubtaskGroup(); + getEnvironment().getTaskInfo().getIndexOfThisSubtask(); } return brokerKey; } @@ -212,7 +212,7 @@ private void reinstantiateDriver() throws Exception { this.driver.setup(this); } catch (Throwable t) { - throw new Exception("The pact driver setup for '" + this.getEnvironment().getTaskName() + + throw new Exception("The pact driver setup for '" + this.getEnvironment().getTaskInfo().getTaskName() + "' , caused an error: " + t.getMessage(), t); } } @@ -361,10 +361,9 @@ private TypeSerializer getOutputSerializer() { private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext { - public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, - ExecutionConfig executionConfig, Map> cpTasks, - Map> accumulatorMap) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap); + public IterativeRuntimeUdfContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, + Map> cpTasks, Map> accumulatorMap) { + super(taskInfo, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java index c6268f4471600..66778c90014ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java @@ -231,7 +231,7 @@ private SuperstepBarrier initSuperstepBarrier() { @Override public void run() throws Exception { final String brokerKey = brokerKey(); - final int workerIndex = getEnvironment().getIndexInSubtaskGroup(); + final int workerIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask(); final boolean objectSolutionSet = config.isSolutionSetUnmanaged(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java index a85e662394abb..82e12b4ee3332 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java @@ -204,7 +204,7 @@ private void sendToAllWorkers(TaskEvent event) throws IOException, InterruptedEx } private String formatLogString(String message) { - return BatchTask.constructLogString(message, getEnvironment().getTaskName(), this); + return BatchTask.constructLogString(message, getEnvironment().getTaskInfo().getTaskName(), this); } // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java index 4923b3b6b131a..f616f57d20886 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java @@ -96,7 +96,7 @@ public ClassLoader getUserCodeClassLoader() { * @return the current number of subtasks the respective task is split into */ public int getCurrentNumberOfSubtasks() { - return this.environment.getNumberOfSubtasks(); + return this.environment.getTaskInfo().getNumberOfParallelSubtasks(); } /** @@ -105,7 +105,7 @@ public int getCurrentNumberOfSubtasks() { * @return the index of this subtask in the subtask group */ public int getIndexInSubtaskGroup() { - return this.environment.getIndexInSubtaskGroup(); + return this.environment.getTaskInfo().getIndexOfThisSubtask(); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index c570458dbe6f7..6367b51ecc4ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -265,7 +265,7 @@ public void invoke() throws Exception { Environment env = getEnvironment(); - this.runtimeUdfContext = createRuntimeContext(env.getTaskName()); + this.runtimeUdfContext = createRuntimeContext(); // whatever happens in this scope, make sure that the local strategies are cleaned up! // note that the initialization of the local strategies is in the try-finally block as well, @@ -400,7 +400,7 @@ protected void initialize() throws Exception { this.driver.setup(this); } catch (Throwable t) { - throw new Exception("The driver setup for '" + this.getEnvironment().getTaskName() + + throw new Exception("The driver setup for '" + this.getEnvironment().getTaskInfo().getTaskName() + "' , caused an error: " + t.getMessage(), t); } @@ -461,7 +461,7 @@ protected void run() throws Exception { catch (Throwable t) { // if the preparation caused an error, clean up // errors during clean-up are swallowed, because we have already a root exception - throw new Exception("The data preparation for task '" + this.getEnvironment().getTaskName() + + throw new Exception("The data preparation for task '" + this.getEnvironment().getTaskInfo().getTaskName() + "' , caused an error: " + t.getMessage(), t); } @@ -1014,12 +1014,11 @@ protected void initOutputs() throws Exception { this.getExecutionConfig(), reporter, this.accumulatorMap); } - public DistributedRuntimeUDFContext createRuntimeContext(String taskName) { + public DistributedRuntimeUDFContext createRuntimeContext() { Environment env = getEnvironment(); - return new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(), - env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), - env.getDistributedCacheEntries(), this.accumulatorMap); + return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(), + getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap); } // -------------------------------------------------------------------------------------------- @@ -1063,7 +1062,7 @@ public AbstractInvokable getOwningNepheleTask() { @Override public String formatLogString(String message) { - return constructLogString(message, getEnvironment().getTaskName(), this); + return constructLogString(message, getEnvironment().getTaskInfo().getTaskName(), this); } @Override @@ -1150,8 +1149,8 @@ else if (index < 0 || index >= this.driver.getNumberOfDriverComparators()) { * @return The string for logging. */ public static String constructLogString(String message, String taskName, AbstractInvokable parent) { - return message + ": " + taskName + " (" + (parent.getEnvironment().getIndexInSubtaskGroup() + 1) + - '/' + parent.getEnvironment().getNumberOfSubtasks() + ')'; + return message + ": " + taskName + " (" + (parent.getEnvironment().getTaskInfo().getIndexOfThisSubtask() + 1) + + '/' + parent.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks() + ')'; } /** @@ -1172,7 +1171,7 @@ public static void logAndThrowException(Exception ex, AbstractInvokable parent) ex = cex.getWrappedException(); } while (ex instanceof ExceptionInChainedStubException); } else { - taskName = parent.getEnvironment().getTaskName(); + taskName = parent.getEnvironment().getTaskInfo().getTaskName(); } if (LOG.isErrorEnabled()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java index addceea9b2b2a..db675d86fbcb5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java @@ -184,7 +184,7 @@ public void invoke() throws Exception LOG.debug(getLogString("Starting to produce output")); // open - format.open(this.getEnvironment().getIndexInSubtaskGroup(), this.getEnvironment().getNumberOfSubtasks()); + format.open(this.getEnvironment().getTaskInfo().getIndexOfThisSubtask(), this.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks()); if (objectReuseEnabled) { IT record = serializer.createInstance(); @@ -385,14 +385,13 @@ private void initInputReaders() throws Exception { * @return The string ready for logging. */ private String getLogString(String message) { - return BatchTask.constructLogString(message, this.getEnvironment().getTaskName(), this); + return BatchTask.constructLogString(message, this.getEnvironment().getTaskInfo().getTaskName(), this); } public DistributedRuntimeUDFContext createRuntimeContext() { Environment env = getEnvironment(); - return new DistributedRuntimeUDFContext(env.getTaskName(), env.getNumberOfSubtasks(), - env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), - env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap()); + return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(), + getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 801e1a15de6f8..915f66b2892a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -297,7 +297,7 @@ private void initOutputs(ClassLoader cl) throws Exception { * @return The string ready for logging. */ private String getLogString(String message) { - return getLogString(message, this.getEnvironment().getTaskName()); + return getLogString(message, this.getEnvironment().getTaskInfo().getTaskName()); } /** @@ -365,8 +365,7 @@ public void remove() { public DistributedRuntimeUDFContext createRuntimeContext() { Environment env = getEnvironment(); - return new DistributedRuntimeUDFContext(env.getTaskName(), env.getNumberOfSubtasks(), - env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(), - env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap()); + return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(), + getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java index 6edeb8434978b..2fb52cde980e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java @@ -64,11 +64,10 @@ public void setup(TaskConfig config, String taskName, Collector outputCollec Environment env = parent.getEnvironment(); if (parent instanceof BatchTask) { - this.udfContext = ((BatchTask) parent).createRuntimeContext(taskName); + this.udfContext = ((BatchTask) parent).createRuntimeContext(); } else { - this.udfContext = new DistributedRuntimeUDFContext(taskName, env.getNumberOfSubtasks(), - env.getIndexInSubtaskGroup(), userCodeClassLoader, parent.getExecutionConfig(), - env.getDistributedCacheEntries(), accumulatorMap + this.udfContext = new DistributedRuntimeUDFContext(env.getTaskInfo(), userCodeClassLoader, + parent.getExecutionConfig(), env.getDistributedCacheEntries(), accumulatorMap ); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java index f74989ec28bfa..b5ac4d788ad5f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/DistributedRuntimeUDFContext.java @@ -24,6 +24,7 @@ import java.util.concurrent.Future; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.functions.RuntimeContext; @@ -41,9 +42,9 @@ public class DistributedRuntimeUDFContext extends AbstractRuntimeUDFContext { private final HashMap> broadcastVars = new HashMap>(); - public DistributedRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, - ExecutionConfig executionConfig, Map> cpTasks, Map> accumulators) { - super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks); + public DistributedRuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, + Map> cpTasks, Map> accumulators) { + super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index 567f051571707..e2a041bd0a4a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; @@ -40,7 +41,6 @@ import java.util.concurrent.Future; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkArgument; /** * In implementation of the {@link Environment}. @@ -51,10 +51,7 @@ public class RuntimeEnvironment implements Environment { private final JobVertexID jobVertexId; private final ExecutionAttemptID executionId; - private final String taskName; - private final String taskNameWithSubtasks; - private final int subtaskIndex; - private final int parallelism; + private final TaskInfo taskInfo; private final Configuration jobConfiguration; private final Configuration taskConfiguration; @@ -83,10 +80,7 @@ public RuntimeEnvironment( JobID jobId, JobVertexID jobVertexId, ExecutionAttemptID executionId, - String taskName, - String taskNameWithSubtasks, - int subtaskIndex, - int parallelism, + TaskInfo taskInfo, Configuration jobConfiguration, Configuration taskConfiguration, ClassLoader userCodeClassLoader, @@ -100,16 +94,11 @@ public RuntimeEnvironment( InputGate[] inputGates, ActorGateway jobManager, TaskManagerRuntimeInfo taskManagerInfo) { - - checkArgument(parallelism > 0 && subtaskIndex >= 0 && subtaskIndex < parallelism); this.jobId = checkNotNull(jobId); this.jobVertexId = checkNotNull(jobVertexId); this.executionId = checkNotNull(executionId); - this.taskName = checkNotNull(taskName); - this.taskNameWithSubtasks = checkNotNull(taskNameWithSubtasks); - this.subtaskIndex = subtaskIndex; - this.parallelism = parallelism; + this.taskInfo = checkNotNull(taskInfo); this.jobConfiguration = checkNotNull(jobConfiguration); this.taskConfiguration = checkNotNull(taskConfiguration); this.userCodeClassLoader = checkNotNull(userCodeClassLoader); @@ -143,23 +132,8 @@ public ExecutionAttemptID getExecutionId() { } @Override - public String getTaskName() { - return taskName; - } - - @Override - public String getTaskNameWithSubtasks() { - return taskNameWithSubtasks; - } - - @Override - public int getNumberOfSubtasks() { - return parallelism; - } - - @Override - public int getIndexInSubtaskGroup() { - return subtaskIndex; + public TaskInfo getTaskInfo() { + return this.taskInfo; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index ae1c0cda2db1a..851f8efc069b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -71,7 +72,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; /** @@ -120,16 +120,9 @@ public class Task implements Runnable { /** The execution attempt of the parallel subtask */ private final ExecutionAttemptID executionId; - /** The index of the parallel subtask, in [0, numberOfSubtasks) */ - private final int subtaskIndex; + /** TaskInfo object for this task */ + private final TaskInfo taskInfo; - /** The number of parallel subtasks for the JobVertex/ExecutionJobVertex that this task belongs to */ - private final int parallelism; - - /** The name of the task */ - private final String taskName; - - /** The name of the task, including the subtask index and the parallelism */ private final String taskNameWithSubtask; /** The job-wide configuration object */ @@ -237,17 +230,11 @@ public Task(TaskDeploymentDescriptor tdd, FileCache fileCache, TaskManagerRuntimeInfo taskManagerConfig) { - checkArgument(tdd.getNumberOfSubtasks() > 0); - checkArgument(tdd.getIndexInSubtaskGroup() >= 0); - checkArgument(tdd.getIndexInSubtaskGroup() < tdd.getNumberOfSubtasks()); - + this.taskInfo = checkNotNull(tdd.getTaskInfo()); this.jobId = checkNotNull(tdd.getJobID()); this.vertexId = checkNotNull(tdd.getVertexID()); this.executionId = checkNotNull(tdd.getExecutionId()); - this.subtaskIndex = tdd.getIndexInSubtaskGroup(); - this.parallelism = tdd.getNumberOfSubtasks(); - this.taskName = checkNotNull(tdd.getTaskName()); - this.taskNameWithSubtask = getTaskNameWithSubtask(taskName, subtaskIndex, parallelism); + this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks(); this.jobConfiguration = checkNotNull(tdd.getJobConfiguration()); this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration()); this.requiredJarFiles = checkNotNull(tdd.getRequiredJarFiles()); @@ -274,8 +261,7 @@ public Task(TaskDeploymentDescriptor tdd, // create the reader and writer structures - final String taskNameWithSubtasksAndId = - Task.getTaskNameWithSubtaskAndID(taskName, subtaskIndex, parallelism, executionId); + final String taskNameWithSubtaskAndId = taskNameWithSubtask + " (" + executionId + ')'; List partitions = tdd.getProducedPartitions(); List consumedPartitions = tdd.getInputGates(); @@ -289,7 +275,7 @@ public Task(TaskDeploymentDescriptor tdd, ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId); this.producedPartitions[i] = new ResultPartition( - taskNameWithSubtasksAndId, + taskNameWithSubtaskAndId, jobId, partitionId, desc.getPartitionType(), @@ -308,7 +294,7 @@ public Task(TaskDeploymentDescriptor tdd, for (int i = 0; i < this.inputGates.length; i++) { SingleInputGate gate = SingleInputGate.create( - taskNameWithSubtasksAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment); + taskNameWithSubtaskAndId, jobId, executionId, consumedPartitions.get(i), networkEnvironment); this.inputGates[i] = gate; inputGatesById.put(gate.getConsumedResultId(), gate); @@ -336,20 +322,8 @@ public ExecutionAttemptID getExecutionId() { return executionId; } - public int getIndexInSubtaskGroup() { - return subtaskIndex; - } - - public int getNumberOfSubtasks() { - return parallelism; - } - - public String getTaskName() { - return taskName; - } - - public String getTaskNameWithSubtasks() { - return taskNameWithSubtask; + public TaskInfo getTaskInfo() { + return taskInfo; } public Configuration getJobConfiguration() { @@ -515,8 +489,7 @@ else if (current == ExecutionState.CANCELING) { TaskInputSplitProvider splitProvider = new TaskInputSplitProvider(jobManager, jobId, vertexId, executionId, userCodeClassLoader, actorAskTimeout); - Environment env = new RuntimeEnvironment(jobId, vertexId, executionId, - taskName, taskNameWithSubtask, subtaskIndex, parallelism, + Environment env = new RuntimeEnvironment(jobId, vertexId, executionId, taskInfo, jobConfiguration, taskConfiguration, userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, accumulatorRegistry, @@ -1050,19 +1023,7 @@ private void cancelInvokable() { @Override public String toString() { - return getTaskNameWithSubtasks() + " [" + executionState + ']'; - } - - // ------------------------------------------------------------------------ - // Task Names - // ------------------------------------------------------------------------ - - public static String getTaskNameWithSubtask(String name, int subtask, int numSubtasks) { - return name + " (" + (subtask+1) + '/' + numSubtasks + ')'; - } - - public static String getTaskNameWithSubtaskAndID(String name, int subtask, int numSubtasks, ExecutionAttemptID id) { - return name + " (" + (subtask+1) + '/' + numSubtasks + ") (" + id + ')'; + return taskNameWithSubtask + " [" + executionState + ']'; } /** diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index f8f52045c6695..58547ad22bef7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -889,7 +889,7 @@ class TaskManager( fileCache, runtimeInfo) - log.info(s"Received task ${task.getTaskNameWithSubtasks}") + log.info(s"Received task ${task.getTaskInfo.getTaskNameWithSubtasks}") val execId = tdd.getExecutionId // add the task to the map @@ -939,7 +939,7 @@ class TaskManager( catch { case t: Throwable => log.error(s"Could not update input data location for task " + - s"${task.getTaskName}. Trying to fail task.", t) + s"${task.getTaskInfo.getTaskName}. Trying to fail task.", t) try { task.failExternally(t) @@ -1003,7 +1003,7 @@ class TaskManager( } log.info(s"Unregistering task and sending final execution state " + - s"${task.getExecutionState} to JobManager for task ${task.getTaskName} " + + s"${task.getExecutionState} to JobManager for task ${task.getTaskInfo.getTaskName} " + s"(${task.getExecutionId})") val accumulators = { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java index 2dd1caf21cf66..36ac3519afd04 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java @@ -46,6 +46,7 @@ public void testSerialization() { final String taskName = "task name"; final int indexInSubtaskGroup = 0; final int currentNumberOfSubtasks = 1; + final int attemptNumber = 0; final Configuration jobConfiguration = new Configuration(); final Configuration taskConfiguration = new Configuration(); final Class invokableClass = BatchTask.class; @@ -55,11 +56,11 @@ public void testSerialization() { final List requiredClasspaths = new ArrayList(0); final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(jobID, vertexID, execId, taskName, - indexInSubtaskGroup, currentNumberOfSubtasks, jobConfiguration, taskConfiguration, + indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber, jobConfiguration, taskConfiguration, invokableClass.getName(), producedResults, inputGates, requiredJars, requiredClasspaths, 47); final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig); - + assertFalse(orig.getJobID() == copy.getJobID()); assertFalse(orig.getVertexID() == copy.getVertexID()); assertFalse(orig.getTaskName() == copy.getTaskName()); @@ -71,6 +72,7 @@ public void testSerialization() { assertEquals(orig.getTaskName(), copy.getTaskName()); assertEquals(orig.getIndexInSubtaskGroup(), copy.getIndexInSubtaskGroup()); assertEquals(orig.getNumberOfSubtasks(), copy.getNumberOfSubtasks()); + assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber()); assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions()); assertEquals(orig.getInputGates(), copy.getInputGates()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java index 2783b6cc0abfa..35d14a9e0cb36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java @@ -133,7 +133,7 @@ public void registerInputOutput() { @Override public void invoke() throws Exception { final IntValue subtaskIndex = new IntValue( - getEnvironment().getIndexInSubtaskGroup()); + getEnvironment().getTaskInfo().getIndexOfThisSubtask()); try { for (int i = 0; i < numberOfTimesToSend; i++) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java index 3bcac56872f79..1fa5d4b5496af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java @@ -148,7 +148,7 @@ public void registerInputOutput() { @Override public void invoke() throws Exception { final IntValue subtaskIndex = new IntValue( - getEnvironment().getIndexInSubtaskGroup()); + getEnvironment().getTaskInfo().getIndexOfThisSubtask()); // Produce the first intermediate result and then the second in a serial fashion. for (RecordWriter writer : writers) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index 71bec4a41b8c5..ff6593f84798b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -22,6 +22,7 @@ import java.util.concurrent.Future; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.accumulators.AccumulatorRegistry; @@ -39,16 +40,12 @@ public class DummyEnvironment implements Environment { - private final String taskName; - private final int numSubTasks; - private final int subTaskIndex; + private final TaskInfo taskInfo; private final JobID jobId = new JobID(); private final JobVertexID jobVertexId = new JobVertexID(); public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) { - this.taskName = taskName; - this.numSubTasks = numSubTasks; - this.subTaskIndex = subTaskIndex; + this.taskInfo = new TaskInfo(taskName, subTaskIndex, numSubTasks, 0); } @Override @@ -82,13 +79,8 @@ public Configuration getJobConfiguration() { } @Override - public int getNumberOfSubtasks() { - return numSubTasks; - } - - @Override - public int getIndexInSubtaskGroup() { - return subTaskIndex; + public TaskInfo getTaskInfo() { + return taskInfo; } @Override @@ -106,16 +98,6 @@ public MemoryManager getMemoryManager() { return null; } - @Override - public String getTaskName() { - return taskName; - } - - @Override - public String getTaskNameWithSubtasks() { - return taskName; - } - @Override public ClassLoader getUserClassLoader() { return null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 972aeda18f290..fa97210c98347 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.operators.testutils; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.core.fs.Path; @@ -62,7 +63,7 @@ public class MockEnvironment implements Environment { - private final String taskName; + private final TaskInfo taskInfo; private final MemoryManager memManager; @@ -87,7 +88,7 @@ public class MockEnvironment implements Environment { private final int bufferSize; public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { - this.taskName = taskName; + this.taskInfo = new TaskInfo(taskName, 0, 1, 0); this.jobConfiguration = new Configuration(); this.taskConfiguration = new Configuration(); this.inputs = new LinkedList(); @@ -199,29 +200,14 @@ public TaskManagerRuntimeInfo getTaskManagerInfo() { return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration())); } - @Override - public int getNumberOfSubtasks() { - return 1; - } - - @Override - public int getIndexInSubtaskGroup() { - return 0; - } - @Override public InputSplitProvider getInputSplitProvider() { return this.inputSplitProvider; } @Override - public String getTaskName() { - return taskName; - } - - @Override - public String getTaskNameWithSubtasks() { - return taskName + "(0/1)"; + public TaskInfo getTaskInfo() { + return taskInfo; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java index d1298b15c5be6..05bc8fa4da81b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java @@ -80,7 +80,7 @@ public void testSetupAndSerialization() { // supreme! } - backend.initializeForJob(new DummyEnvironment("test", 0, 0)); + backend.initializeForJob(new DummyEnvironment("test", 1, 0)); assertNotNull(backend.getCheckpointDirectory()); File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); @@ -107,7 +107,7 @@ public void testSerializableState() { try { FsStateBackend backend = CommonTestUtils.createCopySerializable( new FsStateBackend(tempDir.toURI(), 40)); - backend.initializeForJob(new DummyEnvironment("test", 0, 0)); + backend.initializeForJob(new DummyEnvironment("test", 1, 0)); File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); @@ -147,7 +147,7 @@ public void testStateOutputStream() { FsStateBackend backend = CommonTestUtils.createCopySerializable( new FsStateBackend(tempDir.toURI(), 15)); - backend.initializeForJob(new DummyEnvironment("test", 0, 0)); + backend.initializeForJob(new DummyEnvironment("test", 1, 0)); File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); @@ -228,7 +228,7 @@ public void testKeyValueState() { File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); try { FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); - backend.initializeForJob(new DummyEnvironment("test", 0, 0)); + backend.initializeForJob(new DummyEnvironment("test", 1, 0)); File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); @@ -317,7 +317,7 @@ public void testRestoreWithWrongSerializers() { File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); try { FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); - backend.initializeForJob(new DummyEnvironment("test", 0, 0)); + backend.initializeForJob(new DummyEnvironment("test", 1, 0)); File checkpointDir = new File(backend.getCheckpointDirectory().toUri().getPath()); @@ -389,7 +389,7 @@ public void testCopyDefaultValue() { File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); try { FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir))); - backend.initializeForJob(new DummyEnvironment("test", 0, 0)); + backend.initializeForJob(new DummyEnvironment("test", 1, 0)); KvState kv = backend.createKvState("a_0", "a", IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 85f8be58651b4..a27a4f0851390 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -148,7 +148,7 @@ private static Task createTask() { TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( new JobID(), new JobVertexID(), new ExecutionAttemptID(), - "Test Task", 0, 1, + "Test Task", 0, 1, 0, new Configuration(), new Configuration(), CheckpointsInOrderInvokable.class.getName(), Collections.emptyList(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 131a2e7895298..db32c158cd725 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -159,7 +159,7 @@ protected void run() { final JobVertexID vid = new JobVertexID(); final ExecutionAttemptID eid = new ExecutionAttemptID(); - final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7, + final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, vid, eid, "TestTask", 2, 7, 0, new Configuration(), new Configuration(), TestInvokableCorrect.class.getName(), Collections.emptyList(), Collections.emptyList(), @@ -264,13 +264,13 @@ public void testJobSubmissionAndCanceling() { final ExecutionAttemptID eid1 = new ExecutionAttemptID(); final ExecutionAttemptID eid2 = new ExecutionAttemptID(); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid1, vid1, eid1, "TestTask1", 1, 5, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid2, vid2, eid2, "TestTask2", 2, 7, 0, new Configuration(), new Configuration(), TestInvokableBlockingCancelable.class.getName(), Collections.emptyList(), Collections.emptyList(), @@ -402,13 +402,13 @@ public void testGateChannelEdgeMismatch() { final ExecutionAttemptID eid1 = new ExecutionAttemptID(); final ExecutionAttemptID eid2 = new ExecutionAttemptID(); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), Collections.emptyList(), Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.emptyList(), Collections.emptyList(), @@ -506,12 +506,12 @@ public void testRunJobWithForwardChannel() { } ); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), irpdd, Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0, new Configuration(), new Configuration(), Tasks.Receiver.class.getName(), Collections.emptyList(), Collections.singletonList(ircdd), @@ -650,12 +650,12 @@ public void testCancellingDependentAndStateUpdateFails() { } ); - final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, + final TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(jid, vid1, eid1, "Sender", 0, 1, 0, new Configuration(), new Configuration(), Tasks.Sender.class.getName(), irpdd, Collections.emptyList(), new ArrayList(), Collections.emptyList(), 0); - final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, + final TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(jid, vid2, eid2, "Receiver", 2, 7, 0, new Configuration(), new Configuration(), Tasks.BlockingReceiver.class.getName(), Collections.emptyList(), Collections.singletonList(ircdd), @@ -793,7 +793,7 @@ public void testRemotePartitionNotFound() throws Exception { new InputGateDeploymentDescriptor(resultId, 0, icdd); final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - jid, vid, eid, "Receiver", 0, 1, + jid, vid, eid, "Receiver", 0, 1, 0, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.emptyList(), @@ -886,7 +886,7 @@ public void testLocalPartitionNotFound() throws Exception { new InputGateDeploymentDescriptor(resultId, 0, icdd); final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - jid, vid, eid, "Receiver", 0, 1, + jid, vid, eid, "Receiver", 0, 1, 0, new Configuration(), new Configuration(), Tasks.AgnosticReceiver.class.getName(), Collections.emptyList(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index fb933f87eb9bc..5197a45c061e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -734,7 +734,7 @@ private Task createTask(Class invokable, private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class invokable) { return new TaskDeploymentDescriptor( new JobID(), new JobVertexID(), new ExecutionAttemptID(), - "Test Task", 0, 1, + "Test Task", 0, 1, 0, new Configuration(), new Configuration(), invokable.getName(), Collections.emptyList(), diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala index f6f7cc685da0f..18de9c7199a80 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala @@ -121,7 +121,7 @@ object Tasks { import FailingOnceReceiver.failed override def invoke(): Unit = { - if(!failed && getEnvironment.getIndexInSubtaskGroup == 0){ + if(!failed && getEnvironment.getTaskInfo.getIndexOfThisSubtask == 0){ failed = true throw new Exception("Test exception.") }else{ diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java index ce8298b1d5e9e..49dfc21b45f4b 100644 --- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java +++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java @@ -128,7 +128,7 @@ public void testSetupAndSerialization() { // supreme! } - backend.initializeForJob(new DummyEnvironment("test", 0, 0)); + backend.initializeForJob(new DummyEnvironment("test", 1, 0)); assertNotNull(backend.getCheckpointDirectory()); Path checkpointDir = backend.getCheckpointDirectory(); @@ -151,7 +151,7 @@ public void testSerializableState() { try { FsStateBackend backend = CommonTestUtils.createCopySerializable( new FsStateBackend(randomHdfsFileUri(), 40)); - backend.initializeForJob(new DummyEnvironment("test", 0, 0)); + backend.initializeForJob(new DummyEnvironment("test", 1, 0)); Path checkpointDir = backend.getCheckpointDirectory(); @@ -185,7 +185,7 @@ public void testStateOutputStream() { try { FsStateBackend backend = CommonTestUtils.createCopySerializable( new FsStateBackend(randomHdfsFileUri(), 15)); - backend.initializeForJob(new DummyEnvironment("test", 0, 0)); + backend.initializeForJob(new DummyEnvironment("test", 1, 0)); Path checkpointDir = backend.getCheckpointDirectory(); diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java index 287129df173ad..14d9cde6cd875 100644 --- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java +++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java @@ -19,6 +19,7 @@ package org.apache.flink.tez.runtime; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.Function; import org.apache.flink.core.fs.Path; @@ -67,9 +68,13 @@ public void initialize() throws Exception { TezTaskConfig taskConfig = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader()); taskConfig.setTaskName(getContext().getTaskVertexName()); - RuntimeUDFContext runtimeUdfContext = new RuntimeUDFContext(getContext().getTaskVertexName(), - getContext().getVertexParallelism(), - getContext().getTaskIndex(), + RuntimeUDFContext runtimeUdfContext = new RuntimeUDFContext( + new TaskInfo( + getContext().getTaskVertexName(), + getContext().getTaskIndex(), + getContext().getVertexParallelism(), + getContext().getTaskAttemptNumber() + ), getClass().getClassLoader(), new ExecutionConfig(), new HashMap>(), diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java index 035737e74bc50..b8afe3a83a377 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -80,6 +80,11 @@ public int getIndexOfThisSubtask() { return indexOfThisSubtask; } + @Override + public int getAttemptNumber() { + return 0; + } + @Override public ExecutionConfig getExecutionConfig() { throw new UnsupportedOperationException(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index 29ff1f94d3052..46f2fef100958 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -60,9 +60,7 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext { public StreamingRuntimeContext(AbstractStreamOperator operator, Environment env, Map> accumulators) { - super(env.getTaskName(), - env.getNumberOfSubtasks(), - env.getIndexInSubtaskGroup(), + super(env.getTaskInfo(), env.getUserClassLoader(), operator.getExecutionConfig(), accumulators, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java index 2125df1590ce3..c9d4f1c13cfef 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java @@ -46,7 +46,7 @@ protected void run() throws Exception { } final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId , - getEnvironment().getIndexInSubtaskGroup()); + getEnvironment().getTaskInfo().getIndexOfThisSubtask()); final long iterationWaitTime = getConfiguration().getIterationWaitTime(); final boolean shouldWait = iterationWaitTime > 0; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java index 9bb5311aab3cf..0d0f21272c1b0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java @@ -42,7 +42,7 @@ public void init() throws Exception { } final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId, - getEnvironment().getIndexInSubtaskGroup()); + getEnvironment().getTaskInfo().getIndexOfThisSubtask()); final long iterationWaitTime = getConfiguration().getIterationWaitTime(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index a1c76b17ccd00..b67a98eeafecd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -374,7 +374,7 @@ protected void finalize() throws Throwable { * @return The name of the task. */ public String getName() { - return getEnvironment().getTaskNameWithSubtasks(); + return getEnvironment().getTaskInfo().getTaskName(); } /** diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 63cbd6a281275..ed0f04a8b9989 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -885,8 +886,7 @@ public void apply(Integer key, when(task.getExecutionConfig()).thenReturn(new ExecutionConfig()); Environment env = mock(Environment.class); - when(env.getIndexInSubtaskGroup()).thenReturn(0); - when(env.getNumberOfSubtasks()).thenReturn(1); + when(env.getTaskInfo()).thenReturn(new TaskInfo("Test task name", 0, 1, 0)); when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader()); when(task.getEnvironment()).thenReturn(env); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 55cd9fe1144ba..b3e59e51c9ba6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichReduceFunction; @@ -1012,8 +1013,7 @@ public Tuple2 reduce(Tuple2 value1, Tuple2(); @@ -211,29 +215,14 @@ public Configuration getJobConfiguration() { return this.jobConfiguration; } - @Override - public int getNumberOfSubtasks() { - return 1; - } - - @Override - public int getIndexInSubtaskGroup() { - return 0; - } - @Override public InputSplitProvider getInputSplitProvider() { return this.inputSplitProvider; } @Override - public String getTaskName() { - return ""; - } - - @Override - public String getTaskNameWithSubtasks() { - return ""; + public TaskInfo getTaskInfo() { + return this.taskInfo; } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java index fef6c9f4d4036..65036662b9abc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.test.util.JavaProgramTestBase; @@ -35,27 +36,35 @@ */ public class TaskFailureITCase extends JavaProgramTestBase { + private static String EXCEPTION_STRING = "This is an expected Test Exception"; + @Override protected void testProgram() throws Exception { //test failing version try { - executeTask(new FailingTestMapper()); + executeTask(new FailingTestMapper(), 1); } catch (RuntimeException e) { //expected for collection execution if (!isCollectionExecution()) { Assert.fail(); } + // for collection execution, no restarts. So, exception should be appended with 0. + Assert.assertEquals(EXCEPTION_STRING + ":0", e.getMessage()); } catch (JobExecutionException e) { //expected for cluster execution if (isCollectionExecution()) { Assert.fail(); } + // for cluster execution, one restart. So, exception should be appended with 1. + Assert.assertEquals(EXCEPTION_STRING + ":1", e.getCause().getMessage()); } //test correct version - executeTask(new TestMapper()); + executeTask(new TestMapper(), 0); } - private void executeTask(MapFunction mapper) throws Exception { + private void executeTask(MapFunction mapper, int retries) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().setNumberOfExecutionRetries(retries); + env.getConfig().setExecutionRetryDelay(0); List result = env.generateSequence(1, 9) .map(mapper) .collect(); @@ -78,12 +87,12 @@ public Long map(Long value) throws Exception { /** * failing map function */ - public static class FailingTestMapper implements MapFunction { + public static class FailingTestMapper extends RichMapFunction { private static final long serialVersionUID = 1L; @Override public Long map(Long value) throws Exception { - throw new RuntimeException("This is an expected Test Exception"); + throw new RuntimeException(EXCEPTION_STRING + ":" + getRuntimeContext().getAttemptNumber()); } } }