Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2462] [streaming] Major cleanup of streaming task structure #1017

Closed
wants to merge 2 commits into from

Conversation

StephanEwen
Copy link
Contributor

This pull request addresses exception handling, code duplication, and missed resource cleanups in the streaming operators.

I mixed multiple issues in this pull request, which would have been better separated, but many were recognized in the rework, and it was tricky to pull the fixes apart.

Exception handling

The exceptions are no longer logged by the operators themselves. Operators perform only cleanup in reaction to exceptions.

Exceptions are reported only the the root Task object, which knows whether this is the first failure-causing exception (root cause), or is a subsequent exception, or whether the task was actually canceled already. In the later case, exceptions are ignored, because many cancellations lead to meaningless exceptions.

Added more exception in signatures, less exception wrapping where not needed

Unified setup / teardown structure in streaming tasks

Core resource acquisition/release logic is in StreamTask, reducing code duplication.
Subtasks (e.g., OneInputStreamTask, IterationTailStreamTask) implement slim methods for certain parts of the life cycle. The OneInputStreamTask becomes as simple as this

public void init() throws Exception {
    TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
    InputGate[] inputGates = getEnvironment().getAllInputGates();
    inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
            getCheckpointBarrierListener(), 
            configuration.getCheckpointMode(),
            getEnvironment().getIOManager(),
            getExecutionConfig().areTimestampsEnabled());

    // make sure that stream tasks report their I/O statistics
    AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
    AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
    inputProcessor.setReporter(reporter);
}

protected void run() throws Exception {
    while (running && inputProcessor.processInput(streamOperator));
}

protected void cleanup() throws Exception {
    inputProcessor.cleanup();
}

protected void cancelTask() {
    running = false;
}

Guaranteed cleanup of output buffer and input buffer resources (formerly missed when other exceptions where encountered).

Unified StreamRecordWriter and RecordWriter usage.

Cleanup in the StreamSource

Fix mixup in instantiation of source contexts in the stream source task

Auto watermark generators correctly shut down their interval scheduler

General

Improve use of generics, got rid of many raw types

StephanEwen added a commit to StephanEwen/flink that referenced this pull request Aug 15, 2015
…ption handling and code simplication

  - The exceptions are no longer logged by the operators themselves.
    Operators perform only cleanup in reaction to exceptions.
    Exceptions are reported only the the root Task object, which knows whether this is the first
    failure-causing exception (root cause), or is a subsequent exception, or whether the task was
    actually canceled already. In the later case, exceptions are ignored, because many
    cancellations lead to meaningless exceptions.

  - more exception in signatures, less wrapping where not needed

  - Core resource acquisition/release logic is in one streaming task, reducing code duplication

  - Guaranteed cleanup of output buffer and input buffer resources (formerly missed when other exceptions where encountered)

  - Fix mixup in instantiation of source contexts in the stream source task

  - Auto watermark generators correctly shut down their interval scheduler

  - Improve use of generics, got rid of many raw types

This closes apache#1017
StephanEwen added a commit to StephanEwen/flink that referenced this pull request Aug 15, 2015
…ption handling and code simplication

  - The exceptions are no longer logged by the operators themselves.
    Operators perform only cleanup in reaction to exceptions.
    Exceptions are reported only the the root Task object, which knows whether this is the first
    failure-causing exception (root cause), or is a subsequent exception, or whether the task was
    actually canceled already. In the later case, exceptions are ignored, because many
    cancellations lead to meaningless exceptions.

  - more exception in signatures, less wrapping where not needed

  - Core resource acquisition/release logic is in one streaming task, reducing code duplication

  - Guaranteed cleanup of output buffer and input buffer resources (formerly missed when other exceptions where encountered)

  - Fix mixup in instantiation of source contexts in the stream source task

  - Auto watermark generators correctly shut down their interval scheduler

  - Improve use of generics, got rid of many raw types

This closes apache#1017
@StephanEwen
Copy link
Contributor Author

Fixed the issues with the tests. Builds locally, waiting for Travis to confirm.

case 1:
inputList1.add(reader);
break;
case 2:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about use a enum to instead of 2, it will be easier to understand it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think that's necessary as it is just an index starting at 1. The possible values 1 and 2 are clearly related to which inputList the reader is added to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually part of the original code - I did not modify it as part of this pull request.
As far as I see it, the StreamEdge code is part of the API, not the runtime. It may be adjusted as part of #988

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The StreamEdge (and StreamNode) stuff is both part of the API and the runtime, the separation is not very clear. This is not changed in #988 but would have to be addressed in a follow-up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to address that in a followup, as this creates conflicts with #988 otherwise.

StephanEwen added a commit to StephanEwen/flink that referenced this pull request Aug 16, 2015
…ption handling and code simplication

  - The exceptions are no longer logged by the operators themselves.
    Operators perform only cleanup in reaction to exceptions.
    Exceptions are reported only the the root Task object, which knows whether this is the first
    failure-causing exception (root cause), or is a subsequent exception, or whether the task was
    actually canceled already. In the later case, exceptions are ignored, because many
    cancellations lead to meaningless exceptions.

  - more exception in signatures, less wrapping where not needed

  - Core resource acquisition/release logic is in one streaming task, reducing code duplication

  - Guaranteed cleanup of output buffer and input buffer resources (formerly missed when other exceptions where encountered)

  - Fix mixup in instantiation of source contexts in the stream source task

  - Auto watermark generators correctly shut down their interval scheduler

  - Improve use of generics, got rid of many raw types

This closes apache#1017
…ption handling and code simplication

  - The exceptions are no longer logged by the operators themselves.
    Operators perform only cleanup in reaction to exceptions.
    Exceptions are reported only the the root Task object, which knows whether this is the first
    failure-causing exception (root cause), or is a subsequent exception, or whether the task was
    actually canceled already. In the later case, exceptions are ignored, because many
    cancellations lead to meaningless exceptions.

  - more exception in signatures, less wrapping where not needed

  - Core resource acquisition/release logic is in one streaming task, reducing code duplication

  - Guaranteed cleanup of output buffer and input buffer resources (formerly missed when other exceptions where encountered)

  - Fix mixup in instantiation of source contexts in the stream source task

  - Auto watermark generators correctly shut down their interval scheduler

  - Improve use of generics, got rid of many raw types

This closes apache#1017
* <p>
* A watermark specifies that no element with a timestamp older or equal to the watermark
* timestamp will be emitted in the future.
* <p>A watermark specifies that no element with a timestamp older or equal to the watermark
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change this? The oracle Javadoc literature (http://www.oracle.com/technetwork/articles/java/index-137868.html) and style guides (http://blog.joda.org/2012/11/javadoc-coding-standards.html) have it like it was. Javadoc is not HTML so tags like <li>, <p> and so on are not closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I think I saw it differently in some of Sun's classes, and copied the style.

It seems the changes to not hurt (JavaDocs interpret the HTML properly), but I'll stick with the official style in the future. Thanks for pointing that out.

@aljoscha
Copy link
Contributor

This looks like a very nice continuation of the cleanup work. I'd suggest to merge it rather sooner than later.

@StephanEwen
Copy link
Contributor Author

Allright, if there are no further comments, I'll merge this...

@asfgit asfgit closed this in 92b1e47 Aug 17, 2015
@StephanEwen StephanEwen deleted the stream_cleanup branch August 31, 2015 15:59
nikste pushed a commit to nikste/flink that referenced this pull request Sep 29, 2015
…ption handling and code simplication

  - The exceptions are no longer logged by the operators themselves.
    Operators perform only cleanup in reaction to exceptions.
    Exceptions are reported only the the root Task object, which knows whether this is the first
    failure-causing exception (root cause), or is a subsequent exception, or whether the task was
    actually canceled already. In the later case, exceptions are ignored, because many
    cancellations lead to meaningless exceptions.

  - more exception in signatures, less wrapping where not needed

  - Core resource acquisition/release logic is in one streaming task, reducing code duplication

  - Guaranteed cleanup of output buffer and input buffer resources (formerly missed when other exceptions where encountered)

  - Fix mixup in instantiation of source contexts in the stream source task

  - Auto watermark generators correctly shut down their interval scheduler

  - Improve use of generics, got rid of many raw types

This closes apache#1017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants