- 
                Notifications
    
You must be signed in to change notification settings  - Fork 13.7k
 
[FLINK-15099][runtime] (FLIP-27) Add Operator Coordinators and Events #10483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| 
           Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit f7d3793 (Sun Dec 08 04:13:43 UTC 2019) Warnings: 
 Mention the bot in a comment to re-run the automated checks. Review Progress
 Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands: 
  | 
    
          
CI report:
 Bot commandsThe @flinkbot bot supports the following commands:
  | 
    
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@StephanEwen Thanks for the patch. The existing code looks good overall. I only have some minor comments. Some of the missing java docs and parameter / variable renaming could be done in a follow-up patch given the release 1.10 code freeze is approaching. We probably also need some more unit test cases for the event passing itself.
        
          
                flink-core/src/main/java/org/apache/flink/util/AutoContextClassLoader.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                .../main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexCoordinatorContext.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
          
            Show resolved
            Hide resolved
        
              
          
                flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...ntime/src/test/java/org/apache/flink/runtime/scheduler/OperatorCoordinatorSchedulerTest.java
          
            Show resolved
            Hide resolved
        
              
          
                flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                ...java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      f7d3793    to
    774d4ca      
    Compare
  
    …he scheduler in tests
b0a384d    to
    ab016b8      
    Compare
  
    | * } | ||
| * </pre> | ||
| */ | ||
| public final class AutoContextClassLoader implements AutoCloseable { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate of TemporaryClassLoaderContext
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would consolidate that in a separate commit. If that gets used beyond plugins, it should reside in a more "universal" package. The AutoContextClassLoader is a tad bit nicer, especially javadoc wise.
Operator Coordinators are instances that exist once per operator. While the operators run on the TaskManagers, the coordinator runs on the JobManager. The coordinator communicates via events with the operators, typicalls to assign work. The first user for those coordinators would be the new source interface. Further users we envision are sinks (for coordinated commits of metadata), or iterations (gather progress and steer supersteps) as well as simple approximate alignments between streams (event time alignment).
ab016b8    to
    627d2c1      
    Compare
  
    | 
           Manually merged in 41b6bfa  | 
    
What is the purpose of the change
This PR introduces Operator Coordinators, as a part of FLIP-27
Operator Coordinators are instances that exist once per operator. While the operators run on the TaskManagers, the coordinator runs on the JobManager. The coordinator communicates via events with the operators, typically to assign work.
The first user for those coordinators would be the new source interface. The OperatorCoordinator will run the Source's Split Enumerator.
This change will also allow us to remove InputSplits and intializeOnMaster / finalizeOnMaster logic in a future step.
Further users we envision are sinks (for coordinated commits of metadata), or iterations (gather progress and coordinate supersteps) as well as simple approximate alignments between streams (event time alignment).
Brief change log
OperatorCoordinatorinterface.Provider) for the Coordinator to the JobVertex of the JobGraph.with theExecutionJobVertex` and the new scheduler (note, integration with the legacy scheduler is not planned)OperatorEventand support for sending sending bidirectional events between Coordinator and Operator.OperatorEventDispatcherand obtain a Gateway to send events. That way, operators are not (more strongly than already) tied to the heavyweightEnvironmentobject.Verifying this change
This change is internal only so far (a building block for other features, like the new Source API).
The change is tested mainly through some units tests, most importantly
flink-runtime : org.apache.flink.runtime.scheduler.OperatorCoordinatorSchedulerTest.javaDoes this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation