Skip to content

Commit

Permalink
[FLINK-2488] [FLINK-2524] [FLINK-3124] Expose Attempt Number in Runti…
Browse files Browse the repository at this point in the history
…meContext and add TaskInfo to hold all task related parameters.

This closes #1386
  • Loading branch information
sachingoel0101 authored and StephanEwen committed Dec 7, 2015
1 parent fe5e2e1 commit e8734b2
Show file tree
Hide file tree
Showing 60 changed files with 389 additions and 346 deletions.
Expand Up @@ -25,6 +25,7 @@
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -332,14 +333,10 @@ public Map<String, Object> getComponentConfiguration() {

public static StreamTask<?, ?> createMockStreamTask(ExecutionConfig execConfig) {
Environment env = mock(Environment.class);
when(env.getTaskName()).thenReturn("Mock Task");
when(env.getTaskNameWithSubtasks()).thenReturn("Mock Task (1/1)");
when(env.getIndexInSubtaskGroup()).thenReturn(0);
when(env.getNumberOfSubtasks()).thenReturn(1);
when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 0, 1, 0));
when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());

StreamTask<?, ?> mockTask = mock(StreamTask.class);
when(mockTask.getName()).thenReturn("Mock Task (1/1)");
when(mockTask.getCheckpointLock()).thenReturn(new Object());
when(mockTask.getConfiguration()).thenReturn(new StreamConfig(new Configuration()));
when(mockTask.getEnvironment()).thenReturn(env);
Expand Down
Expand Up @@ -181,7 +181,7 @@ public <K, V> LazyDbKvState<K, V> createKvState(String stateId, String stateName
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
return new LazyDbKvState<K, V>(
stateId + "_" + env.getJobID().toShortString(),
env.getIndexInSubtaskGroup() == 0,
env.getTaskInfo().getIndexOfThisSubtask() == 0,
getConnections(),
getConfiguration(),
keySerializer,
Expand Down
Expand Up @@ -376,7 +376,7 @@ public Void call() throws Exception {
}, stateBackend.getConfiguration().getMaxNumberOfSqlRetries(),
stateBackend.getConfiguration().getSleepBetweenSqlRetries());

boolean cleanup = stateBackend.getEnvironment().getIndexInSubtaskGroup() == 0;
boolean cleanup = stateBackend.getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0;

// Restore the KvState
LazyDbKvState<K, V> restored = new LazyDbKvState<K, V>(kvStateId, cleanup,
Expand Down
96 changes: 96 additions & 0 deletions flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* Encapsulates task-specific information: name, index of subtask, parallelism and attempt number.
*/
public class TaskInfo {

private final String taskName;
private final String taskNameWithSubtasks;
private final int indexOfSubtask;
private final int numberOfParallelSubtasks;
private final int attemptNumber;

public TaskInfo(String taskName, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
checkArgument(indexOfSubtask >= 0, "Task index must be a non-negative number.");
checkArgument(numberOfParallelSubtasks >= 1, "Parallelism must be a positive number.");
checkArgument(indexOfSubtask < numberOfParallelSubtasks, "Task index must be less than parallelism.");
checkArgument(attemptNumber >= 0, "Attempt number must be a non-negative number.");
this.taskName = checkNotNull(taskName, "Task Name must not be null.");
this.indexOfSubtask = indexOfSubtask;
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.attemptNumber = attemptNumber;
this.taskNameWithSubtasks = taskName + " (" + (indexOfSubtask + 1) + '/' + numberOfParallelSubtasks + ')';
}

/**
* Returns the name of the task
*
* @return The name of the task
*/
public String getTaskName() {
return this.taskName;
}

/**
* Gets the number of this parallel subtask. The numbering starts from 0 and goes up to
* parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
*
* @return The index of the parallel subtask.
*/
public int getIndexOfThisSubtask() {
return this.indexOfSubtask;
}

/**
* Gets the parallelism with which the parallel task runs.
*
* @return The parallelism with which the parallel task runs.
*/
public int getNumberOfParallelSubtasks() {
return this.numberOfParallelSubtasks;
}

/**
* Gets the attempt number of this parallel subtask. First attempt is numbered 0.
* The attempt number corresponds to the number of times this task has been restarted(after
* failure/cancellation) since the job was initially started.
*
* @return Attempt number of the subtask.
*/
public int getAttemptNumber() {
return this.attemptNumber;
}

/**
* Returns the name of the task, appended with the subtask indicator, such as "MyTask (3/6)",
* where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be
* {@link #getNumberOfParallelSubtasks()}.
*
* @return The name of the task, with subtask indicator.
*/
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
}
Expand Up @@ -64,6 +64,22 @@ public interface RuntimeContext {
*/
int getIndexOfThisSubtask();

/**
* Gets the attempt number of this parallel subtask. First attempt is numbered 0.
*
* @return Attempt number of the subtask.
*/
int getAttemptNumber();

/**
* Returns the name of the task, appended with the subtask indicator, such as "MyTask (3/6)",
* where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be
* {@link #getNumberOfParallelSubtasks()}.
*
* @return The name of the task, with subtask indicator.
*/
String getTaskNameWithSubtasks();

/**
* Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing
* job.
Expand Down
Expand Up @@ -25,6 +25,7 @@

import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.accumulators.DoubleCounter;
Expand All @@ -42,11 +43,7 @@
*/
public abstract class AbstractRuntimeUDFContext implements RuntimeContext {

private final String name;

private final int numParallelSubtasks;

private final int subtaskIndex;
private final TaskInfo taskInfo;

private final ClassLoader userCodeClassLoader;

Expand All @@ -56,15 +53,12 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {

private final DistributedCache distributedCache;

public AbstractRuntimeUDFContext(String name,
int numParallelSubtasks, int subtaskIndex,
public AbstractRuntimeUDFContext(TaskInfo taskInfo,
ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig,
Map<String, Accumulator<?,?>> accumulators,
Map<String, Future<Path>> cpTasks) {
this.name = name;
this.numParallelSubtasks = numParallelSubtasks;
this.subtaskIndex = subtaskIndex;
this.taskInfo = Preconditions.checkNotNull(taskInfo);
this.userCodeClassLoader = userCodeClassLoader;
this.executionConfig = executionConfig;
this.distributedCache = new DistributedCache(Preconditions.checkNotNull(cpTasks));
Expand All @@ -78,17 +72,27 @@ public ExecutionConfig getExecutionConfig() {

@Override
public String getTaskName() {
return this.name;
return taskInfo.getTaskName();
}

@Override
public int getNumberOfParallelSubtasks() {
return this.numParallelSubtasks;
return taskInfo.getNumberOfParallelSubtasks();
}

@Override
public int getIndexOfThisSubtask() {
return this.subtaskIndex;
return taskInfo.getIndexOfThisSubtask();
}

@Override
public int getAttemptNumber() {
return taskInfo.getAttemptNumber();
}

@Override
public String getTaskNameWithSubtasks() {
return taskInfo.getTaskNameWithSubtasks();
}

@Override
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.Future;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RuntimeContext;
Expand All @@ -38,9 +39,9 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {

private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>();

public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks);
public RuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks);
}

@Override
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.aggregators.Aggregator;
Expand Down Expand Up @@ -179,10 +180,11 @@ private <IN> void executeDataSink(GenericDataSinkBase<?> sink, int superStep) th
GenericDataSinkBase<IN> typedSink = (GenericDataSinkBase<IN>) sink;

// build the runtime context and compute broadcast variables, if necessary
TaskInfo taskInfo = new TaskInfo(typedSink.getName(), 0, 1, 0);
RuntimeUDFContext ctx;
if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(typedSink.getName(), 1, 0, getClass().getClassLoader(), executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(typedSink.getName(), 1, 0, classLoader, executionConfig, cachedFiles, accumulators);
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators);
} else {
ctx = null;
}
Expand All @@ -195,10 +197,11 @@ private <OUT> List<OUT> executeDataSource(GenericDataSourceBase<?, ?> source, in
@SuppressWarnings("unchecked")
GenericDataSourceBase<OUT, ?> typedSource = (GenericDataSourceBase<OUT, ?>) source;
// build the runtime context and compute broadcast variables, if necessary
TaskInfo taskInfo = new TaskInfo(typedSource.getName(), 0, 1, 0);
RuntimeUDFContext ctx;
if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(source.getName(), 1, 0, getClass().getClassLoader(), executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(source.getName(), 1, 0, classLoader, executionConfig, cachedFiles, accumulators);
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators);
} else {
ctx = null;
}
Expand All @@ -218,12 +221,11 @@ private <IN, OUT> List<OUT> executeUnaryOperator(SingleInputOperator<?, ?, ?> op
SingleInputOperator<IN, OUT, ?> typedOp = (SingleInputOperator<IN, OUT, ?>) operator;

// build the runtime context and compute broadcast variables, if necessary
TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
RuntimeUDFContext ctx;
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass()
.getClassLoader(), executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader,
executionConfig, cachedFiles, accumulators);
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators);

for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());
Expand Down Expand Up @@ -259,12 +261,11 @@ private <IN1, IN2, OUT> List<OUT> executeBinaryOperator(DualInputOperator<?, ?,
DualInputOperator<IN1, IN2, OUT, ?> typedOp = (DualInputOperator<IN1, IN2, OUT, ?>) operator;

// build the runtime context and compute broadcast variables, if necessary
TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
RuntimeUDFContext ctx;
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader,
executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader,
executionConfig, cachedFiles, accumulators);
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators);

for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());
Expand Down Expand Up @@ -519,10 +520,9 @@ else if (op instanceof GenericDataSourceBase) {

private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext {

public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader,
ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String,
Accumulator<?,?>> accumulators) {
super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig, cpTasks, accumulators);
public IterationRuntimeUDFContext(TaskInfo taskInfo, ClassLoader classloader, ExecutionConfig executionConfig,
Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
super(taskInfo, classloader, executionConfig, cpTasks, accumulators);
}

@Override
Expand Down
Expand Up @@ -27,18 +27,21 @@
import java.util.concurrent.Future;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.core.fs.Path;
import org.junit.Test;


public class RuntimeUDFContextTest {


private final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0);

@Test
public void testBroadcastVariableNotFound() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(),new HashMap<String, Accumulator<?, ?>>());
RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(),new HashMap<String, Accumulator<?, ?>>());

try {
ctx.getBroadcastVariable("some name");
Expand Down Expand Up @@ -68,7 +71,7 @@ public void testBroadcastVariableNotFound() {
@Test
public void testBroadcastVariableSimple() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());

ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0));
Expand Down Expand Up @@ -102,7 +105,7 @@ public void testBroadcastVariableSimple() {
@Test
public void testBroadcastVariableWithInitializer() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());

ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));

Expand All @@ -127,7 +130,7 @@ public void testBroadcastVariableWithInitializer() {
@Test
public void testResetBroadcastVariableWithInitializer() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());

ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));

Expand All @@ -150,7 +153,7 @@ public void testResetBroadcastVariableWithInitializer() {
@Test
public void testBroadcastVariableWithInitializerAndMismatch() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());

ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));

Expand Down

0 comments on commit e8734b2

Please sign in to comment.