Skip to content

Commit

Permalink
[FLINK-2462] [streaming] Major cleanup of operator structure for exce…
Browse files Browse the repository at this point in the history
…ption handling and code simplication

  - The exceptions are no longer logged by the operators themselves.
    Operators perform only cleanup in reaction to exceptions.
    Exceptions are reported only the the root Task object, which knows whether this is the first
    failure-causing exception (root cause), or is a subsequent exception, or whether the task was
    actually canceled already. In the later case, exceptions are ignored, because many
    cancellations lead to meaningless exceptions.

  - more exception in signatures, less wrapping where not needed

  - Core resource acquisition/release logic is in one streaming task, reducing code duplication

  - Guaranteed cleanup of output buffer and input buffer resources (formerly missed when other exceptions where encountered)

  - Fix mixup in instantiation of source contexts in the stream source task

  - Auto watermark generators correctly shut down their interval scheduler

  - Improve use of generics, got rid of many raw types

This closes #1017
  • Loading branch information
StephanEwen committed Aug 15, 2015
1 parent c2b1eb7 commit a2dacc9
Show file tree
Hide file tree
Showing 35 changed files with 687 additions and 712 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,13 @@ public void flush() throws IOException {
}

public void clearBuffers() {
if (serializers != null) {
for (RecordSerializer<?> s : serializers) {
synchronized (s) {
Buffer b = s.getCurrentBuffer();
s.clear();

if (b != null) {
b.recycle();
}
for (RecordSerializer<?> s : serializers) {
synchronized (s) {
Buffer b = s.getCurrentBuffer();
s.clear();

if (b != null) {
b.recycle();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract class AbstractInvokable {
/**
* Must be overwritten by the concrete task to instantiate the required record reader and record writer.
*/
public abstract void registerInputOutput();
public abstract void registerInputOutput() throws Exception;

/**
* Must be overwritten by the concrete task. This method is called by the task manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,18 @@

package org.apache.flink.runtime.jobgraph.tasks;


/**
* This interface needs to be implemented by runtime tasks that want to be able to receive
* notifications about completed checkpoints.
*/
public interface CheckpointNotificationOperator {


/**
* Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
* the notification from all participating tasks.
*
* @param checkpointId The ID of the checkpoint that is complete..
* @throws Exception The notification method may forward its exceptions.
*/
void notifyCheckpointComplete(long checkpointId) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
* and as a setup method on the TaskManager.
*/
@Override
public void registerInputOutput() {
public void registerInputOutput() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(formatLogString("Start registering input and output."));
}
Expand All @@ -239,26 +239,13 @@ public void registerInputOutput() {
final Class<? extends PactDriver<S, OT>> driverClass = this.config.getDriver();
this.driver = InstantiationUtil.instantiate(driverClass, PactDriver.class);

// initialize the readers. this is necessary for nephele to create the input gates
// however, this does not trigger any local processing.
try {
initInputReaders();
initBroadcastInputReaders();
} catch (Exception e) {
throw new RuntimeException("Initializing the input streams failed in Task " + getEnvironment().getTaskName() +
(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
}
// initialize the readers.
// this does not yet trigger any stream consuming or processing.
initInputReaders();
initBroadcastInputReaders();

// initialize the writers. this is necessary for nephele to create the output gates.
// because in the presence of chained tasks, the tasks writers depend on the last task in the chain,
// we need to initialize the chained tasks as well. the chained tasks are only set up, but no work
// (such as setting up a sorter, etc.) starts
try {
initOutputs();
} catch (Exception e) {
throw new RuntimeException("Initializing the output handlers failed" +
(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
}
// initialize the writers.
initOutputs();

if (LOG.isDebugEnabled()) {
LOG.debug(formatLogString("Finished registering input and output."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@

public class MockEnvironment implements Environment {

private final String taskName;

private final MemoryManager memManager;

private final IOManager ioManager;
Expand All @@ -85,7 +87,8 @@ public class MockEnvironment implements Environment {

private final int bufferSize;

public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
this.taskName = taskName;
this.jobConfiguration = new Configuration();
this.taskConfiguration = new Configuration();
this.inputs = new LinkedList<InputGate>();
Expand Down Expand Up @@ -214,12 +217,12 @@ public InputSplitProvider getInputSplitProvider() {

@Override
public String getTaskName() {
return null;
return taskName;
}

@Override
public String getTaskNameWithSubtasks() {
return null;
return taskName + "(0/1)";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,11 @@ public abstract class TaskTestBase {
public void initEnvironment(long memorySize, int bufferSize) {
this.memorySize = memorySize;
this.inputSplitProvider = new MockInputSplitProvider();
this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider, bufferSize);
this.mockEnv = new MockEnvironment("mock task", this.memorySize, this.inputSplitProvider, bufferSize);
}

public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) {
final IteratorWrappingTestSingleInputGate<Record> reader = addInput(input, groupId, true);

return reader;
return addInput(input, groupId, true);
}

public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId, boolean read) {
Expand Down Expand Up @@ -89,19 +87,32 @@ public Configuration getConfiguration() {
return this.mockEnv.getTaskConfiguration();
}

public void registerTask(AbstractInvokable task, @SuppressWarnings("rawtypes") Class<? extends PactDriver> driver, Class<? extends RichFunction> stubClass) {
public void registerTask(AbstractInvokable task,
@SuppressWarnings("rawtypes") Class<? extends PactDriver> driver,
Class<? extends RichFunction> stubClass) {

final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration());
config.setDriver(driver);
config.setStubWrapper(new UserCodeClassWrapper<RichFunction>(stubClass));

task.setEnvironment(this.mockEnv);

task.registerInputOutput();
try {
task.registerInputOutput();
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

public void registerTask(AbstractInvokable task) {
task.setEnvironment(this.mockEnv);
task.registerInputOutput();
try {
task.registerInputOutput();
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

public void registerFileOutputTask(AbstractInvokable outTask, Class<? extends FileOutputFormat> stubClass, String outPath) {
Expand All @@ -118,7 +129,12 @@ public void registerFileOutputTask(AbstractInvokable outTask, FileOutputFormat o

outTask.setEnvironment(this.mockEnv);

outTask.registerInputOutput();
try {
outTask.registerInputOutput();
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

public void registerFileInputTask(AbstractInvokable inTask,
Expand All @@ -142,7 +158,12 @@ public void registerFileInputTask(AbstractInvokable inTask,

inTask.setEnvironment(this.mockEnv);

inTask.registerInputOutput();
try {
inTask.registerInputOutput();
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

public MemoryManager getMemoryManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class StreamEdge implements Serializable {
* A list of output names that the target vertex listens to (if there is
* output selection).
*/
final private List<String> selectedNames;
private final List<String> selectedNames;
private StreamPartitioner<?> outputPartitioner;

public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
Expand Down Expand Up @@ -108,11 +108,7 @@ public boolean equals(Object o) {

StreamEdge that = (StreamEdge) o;

if (!edgeId.equals(that.edgeId)) {
return false;
}

return true;
return edgeId.equals(that.edgeId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, Op
public Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> getStateSnapshotFromFunction(long checkpointId, long timestamp)
throws Exception {
// Get all the states for the operator
Map<String, StreamOperatorState> operatorStates = runtimeContext.getOperatorStates();
Map<String, StreamOperatorState<?, ?>> operatorStates = runtimeContext.getOperatorStates();

Map<String, OperatorStateHandle> operatorStateSnapshots;
if (operatorStates.isEmpty()) {
Expand All @@ -108,7 +108,7 @@ public Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> getSt
// Checkpoint the states and store the handles in a map
Map<String, OperatorStateHandle> snapshots = new HashMap<String, OperatorStateHandle>();

for (Entry<String, StreamOperatorState> state : operatorStates.entrySet()) {
for (Entry<String, StreamOperatorState<?, ?>> state : operatorStates.entrySet()) {
boolean isPartitioned = state.getValue() instanceof PartitionedStreamOperatorState;
snapshots.put(state.getKey(),
new OperatorStateHandle(state.getValue().snapshotState(checkpointId, timestamp),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@
* of this interface that can be used to emit elements and other messages, such as barriers
* and watermarks, from an operator.
*
* @param <T> The type of the elments that can be emitted.
* @param <T> The type of the elements that can be emitted.
*/
public interface Output<T> extends Collector<T> {

/**
* Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
* operators.
*
* <p>
* A watermark specifies that no element with a timestamp older or equal to the watermark
* timestamp will be emitted in the future.
* <p>A watermark specifies that no element with a timestamp older or equal to the watermark
* timestamp will be emitted in the future.</p>
*/
void emitWatermark(Watermark mark);
}

0 comments on commit a2dacc9

Please sign in to comment.