Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -332,14 +333,10 @@ public Map<String, Object> getComponentConfiguration() {

public static StreamTask<?, ?> createMockStreamTask(ExecutionConfig execConfig) {
Environment env = mock(Environment.class);
when(env.getTaskName()).thenReturn("Mock Task");
when(env.getTaskNameWithSubtasks()).thenReturn("Mock Task (1/1)");
when(env.getIndexInSubtaskGroup()).thenReturn(0);
when(env.getNumberOfSubtasks()).thenReturn(1);
when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 0, 1, 0));
when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());

StreamTask<?, ?> mockTask = mock(StreamTask.class);
when(mockTask.getName()).thenReturn("Mock Task (1/1)");
when(mockTask.getCheckpointLock()).thenReturn(new Object());
when(mockTask.getConfiguration()).thenReturn(new StreamConfig(new Configuration()));
when(mockTask.getEnvironment()).thenReturn(env);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public <K, V> LazyDbKvState<K, V> createKvState(String stateId, String stateName
TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue) throws IOException {
return new LazyDbKvState<K, V>(
stateId + "_" + env.getJobID().toShortString(),
env.getIndexInSubtaskGroup() == 0,
env.getTaskInfo().getIndexOfThisSubtask() == 0,
getConnections(),
getConfiguration(),
keySerializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ public Void call() throws Exception {
}, stateBackend.getConfiguration().getMaxNumberOfSqlRetries(),
stateBackend.getConfiguration().getSleepBetweenSqlRetries());

boolean cleanup = stateBackend.getEnvironment().getIndexInSubtaskGroup() == 0;
boolean cleanup = stateBackend.getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0;

// Restore the KvState
LazyDbKvState<K, V> restored = new LazyDbKvState<K, V>(kvStateId, cleanup,
Expand Down
96 changes: 96 additions & 0 deletions flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* Encapsulates task-specific information: name, index of subtask, parallelism and attempt number.
*/
public class TaskInfo {

private final String taskName;
private final String taskNameWithSubtasks;
private final int indexOfSubtask;
private final int numberOfParallelSubtasks;
private final int attemptNumber;

public TaskInfo(String taskName, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
checkArgument(indexOfSubtask >= 0, "Task index must be a non-negative number.");
checkArgument(numberOfParallelSubtasks >= 1, "Parallelism must be a positive number.");
checkArgument(indexOfSubtask < numberOfParallelSubtasks, "Task index must be less than parallelism.");
checkArgument(attemptNumber >= 0, "Attempt number must be a non-negative number.");
this.taskName = checkNotNull(taskName, "Task Name must not be null.");
this.indexOfSubtask = indexOfSubtask;
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.attemptNumber = attemptNumber;
this.taskNameWithSubtasks = taskName + " (" + (indexOfSubtask + 1) + '/' + numberOfParallelSubtasks + ')';
}

/**
* Returns the name of the task
*
* @return The name of the task
*/
public String getTaskName() {
return this.taskName;
}

/**
* Gets the number of this parallel subtask. The numbering starts from 0 and goes up to
* parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
*
* @return The index of the parallel subtask.
*/
public int getIndexOfThisSubtask() {
return this.indexOfSubtask;
}

/**
* Gets the parallelism with which the parallel task runs.
*
* @return The parallelism with which the parallel task runs.
*/
public int getNumberOfParallelSubtasks() {
return this.numberOfParallelSubtasks;
}

/**
* Gets the attempt number of this parallel subtask. First attempt is numbered 0.
* The attempt number corresponds to the number of times this task has been restarted(after
* failure/cancellation) since the job was initially started.
*
* @return Attempt number of the subtask.
*/
public int getAttemptNumber() {
return this.attemptNumber;
}

/**
* Returns the name of the task, appended with the subtask indicator, such as "MyTask (3/6)",
* where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be
* {@link #getNumberOfParallelSubtasks()}.
*
* @return The name of the task, with subtask indicator.
*/
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ public interface RuntimeContext {
*/
int getIndexOfThisSubtask();

/**
* Gets the attempt number of this parallel subtask. First attempt is numbered 0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add here something like: "The attempt number corresponds to how many times the task has been restarted (after a failure or cancellation) since the job was initially started."

*
* @return Attempt number of the subtask.
*/
int getAttemptNumber();

/**
* Returns the name of the task, appended with the subtask indicator, such as "MyTask (3/6)",
* where 3 would be ({@link #getIndexOfThisSubtask()} + 1), and 6 would be
* {@link #getNumberOfParallelSubtasks()}.
*
* @return The name of the task, with subtask indicator.
*/
String getTaskNameWithSubtasks();

/**
* Returns the {@link org.apache.flink.api.common.ExecutionConfig} for the currently executing
* job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.accumulators.DoubleCounter;
Expand All @@ -42,11 +43,7 @@
*/
public abstract class AbstractRuntimeUDFContext implements RuntimeContext {

private final String name;

private final int numParallelSubtasks;

private final int subtaskIndex;
private final TaskInfo taskInfo;

private final ClassLoader userCodeClassLoader;

Expand All @@ -56,15 +53,12 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {

private final DistributedCache distributedCache;

public AbstractRuntimeUDFContext(String name,
int numParallelSubtasks, int subtaskIndex,
public AbstractRuntimeUDFContext(TaskInfo taskInfo,
ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig,
Map<String, Accumulator<?,?>> accumulators,
Map<String, Future<Path>> cpTasks) {
this.name = name;
this.numParallelSubtasks = numParallelSubtasks;
this.subtaskIndex = subtaskIndex;
this.taskInfo = Preconditions.checkNotNull(taskInfo);
this.userCodeClassLoader = userCodeClassLoader;
this.executionConfig = executionConfig;
this.distributedCache = new DistributedCache(Preconditions.checkNotNull(cpTasks));
Expand All @@ -78,17 +72,27 @@ public ExecutionConfig getExecutionConfig() {

@Override
public String getTaskName() {
return this.name;
return taskInfo.getTaskName();
}

@Override
public int getNumberOfParallelSubtasks() {
return this.numParallelSubtasks;
return taskInfo.getNumberOfParallelSubtasks();
}

@Override
public int getIndexOfThisSubtask() {
return this.subtaskIndex;
return taskInfo.getIndexOfThisSubtask();
}

@Override
public int getAttemptNumber() {
return taskInfo.getAttemptNumber();
}

@Override
public String getTaskNameWithSubtasks() {
return taskInfo.getTaskNameWithSubtasks();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.Future;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RuntimeContext;
Expand All @@ -38,9 +39,9 @@ public class RuntimeUDFContext extends AbstractRuntimeUDFContext {

private final HashMap<String, List<?>> uninitializedBroadcastVars = new HashMap<String, List<?>>();

public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, accumulators, cpTasks);
public RuntimeUDFContext(TaskInfo taskInfo, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig,
Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
super(taskInfo, userCodeClassLoader, executionConfig, accumulators, cpTasks);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.aggregators.Aggregator;
Expand Down Expand Up @@ -179,10 +180,11 @@ private <IN> void executeDataSink(GenericDataSinkBase<?> sink, int superStep) th
GenericDataSinkBase<IN> typedSink = (GenericDataSinkBase<IN>) sink;

// build the runtime context and compute broadcast variables, if necessary
TaskInfo taskInfo = new TaskInfo(typedSink.getName(), 0, 1, 0);
RuntimeUDFContext ctx;
if (RichOutputFormat.class.isAssignableFrom(typedSink.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(typedSink.getName(), 1, 0, getClass().getClassLoader(), executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(typedSink.getName(), 1, 0, classLoader, executionConfig, cachedFiles, accumulators);
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators);
} else {
ctx = null;
}
Expand All @@ -195,10 +197,11 @@ private <OUT> List<OUT> executeDataSource(GenericDataSourceBase<?, ?> source, in
@SuppressWarnings("unchecked")
GenericDataSourceBase<OUT, ?> typedSource = (GenericDataSourceBase<OUT, ?>) source;
// build the runtime context and compute broadcast variables, if necessary
TaskInfo taskInfo = new TaskInfo(typedSource.getName(), 0, 1, 0);
RuntimeUDFContext ctx;
if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(source.getName(), 1, 0, getClass().getClassLoader(), executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(source.getName(), 1, 0, classLoader, executionConfig, cachedFiles, accumulators);
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators);
} else {
ctx = null;
}
Expand All @@ -218,12 +221,11 @@ private <IN, OUT> List<OUT> executeUnaryOperator(SingleInputOperator<?, ?, ?> op
SingleInputOperator<IN, OUT, ?> typedOp = (SingleInputOperator<IN, OUT, ?>) operator;

// build the runtime context and compute broadcast variables, if necessary
TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
RuntimeUDFContext ctx;
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, getClass()
.getClassLoader(), executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader,
executionConfig, cachedFiles, accumulators);
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators);

for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());
Expand Down Expand Up @@ -259,12 +261,11 @@ private <IN1, IN2, OUT> List<OUT> executeBinaryOperator(DualInputOperator<?, ?,
DualInputOperator<IN1, IN2, OUT, ?> typedOp = (DualInputOperator<IN1, IN2, OUT, ?>) operator;

// build the runtime context and compute broadcast variables, if necessary
TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
RuntimeUDFContext ctx;
if (RichFunction.class.isAssignableFrom(typedOp.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(operator.getName(), 1, 0, classLoader,
executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(operator.getName(), 1, 0, classLoader,
executionConfig, cachedFiles, accumulators);
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators);

for (Map.Entry<String, Operator<?>> bcInputs : operator.getBroadcastInputs().entrySet()) {
List<?> bcData = execute(bcInputs.getValue());
Expand Down Expand Up @@ -519,10 +520,9 @@ else if (op instanceof GenericDataSourceBase) {

private class IterationRuntimeUDFContext extends RuntimeUDFContext implements IterationRuntimeContext {

public IterationRuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader classloader,
ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String,
Accumulator<?,?>> accumulators) {
super(name, numParallelSubtasks, subtaskIndex, classloader, executionConfig, cpTasks, accumulators);
public IterationRuntimeUDFContext(TaskInfo taskInfo, ClassLoader classloader, ExecutionConfig executionConfig,
Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?,?>> accumulators) {
super(taskInfo, classloader, executionConfig, cpTasks, accumulators);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@
import java.util.concurrent.Future;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.core.fs.Path;
import org.junit.Test;


public class RuntimeUDFContextTest {


private final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0);

@Test
public void testBroadcastVariableNotFound() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(),new HashMap<String, Accumulator<?, ?>>());
RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(),new HashMap<String, Accumulator<?, ?>>());

try {
ctx.getBroadcastVariable("some name");
Expand Down Expand Up @@ -68,7 +71,7 @@ public void testBroadcastVariableNotFound() {
@Test
public void testBroadcastVariableSimple() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());

ctx.setBroadcastVariable("name1", Arrays.asList(1, 2, 3, 4));
ctx.setBroadcastVariable("name2", Arrays.asList(1.0, 2.0, 3.0, 4.0));
Expand Down Expand Up @@ -102,7 +105,7 @@ public void testBroadcastVariableSimple() {
@Test
public void testBroadcastVariableWithInitializer() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());

ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));

Expand All @@ -127,7 +130,7 @@ public void testBroadcastVariableWithInitializer() {
@Test
public void testResetBroadcastVariableWithInitializer() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());

ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));

Expand All @@ -150,7 +153,7 @@ public void testResetBroadcastVariableWithInitializer() {
@Test
public void testBroadcastVariableWithInitializerAndMismatch() {
try {
RuntimeUDFContext ctx = new RuntimeUDFContext("test name", 3, 1, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());
RuntimeUDFContext ctx = new RuntimeUDFContext(taskInfo, getClass().getClassLoader(), new ExecutionConfig(), new HashMap<String, Future<Path>>(), new HashMap<String, Accumulator<?, ?>>());

ctx.setBroadcastVariable("name", Arrays.asList(1, 2, 3, 4));

Expand Down
Loading