Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ repos:
- --fuzzy-match-generates-todo
files: \.rst$
- id: insert-license
name: Add license for all JS/CSS files
files: \.(js|css)$
name: Add license for all JS/CSS/PUML files
files: \.(js|css|puml)$
exclude: ^\.github/.*$
args:
- --comment-style
Expand Down
35 changes: 35 additions & 0 deletions docs/executor/celery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,41 @@ The components communicate with each other in many places
* [10] **Scheduler** --> **Celery's result backend** - Gets information about the status of completed tasks
* [11] **Scheduler** --> **Celery's broker** - Put the commands to be executed

Task execution process
----------------------

.. figure:: ../img/run_task_on_celery_executor.png
:scale: 50 %

Sequence diagram - task execution process

Initially, two processes are running:

- SchedulerProcess - process the tasks and run using CeleryExecutor
- WorkerProcess - observes the queue waiting for new tasks to appear
- WorkerChildProcess - waits for new tasks

Two databases are also available:

- QueueBroker
- ResultBackend
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you describe shortly those dbs? Especially ResultBackend, is it the same database as Airflow db?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are described in the section above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also do not describe what a worker and scheduler is, because I assume that a person is familiar with the previous section.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are different databases, but in popular configurations, they point to the same database server.


During this process, two 2 process are created:

- LocalTaskJobProcess - It logic is described by LocalTaskJob. It is monitoring RawTaskProcess. New processes are started using TaskRunner.
- RawTaskProcess - It is process with the user code e.g. :meth:`~airflow.models.BaseOperator.execute`.

| [1] **SchedulerProcess** processes the tasks and when it finds a task that needs to be done, sends it to the **QueueBroker**.
| [2] **QueueBroker** also begins to periodically query **ResultBackend** for the status of the task.
| [3] **QueueBroker**, when it becomes aware of the task, sends information about it to one WorkerProcess.
| [4] **WorkerProcess** assigns a single task to a one **WorkerChildProcess**.
| [5] **WorkerChildProcess** performs the proper task handling functions - :meth:`~airflow.executor.celery_executor.execute_command`. It creates a new process - **LocalTaskJobProcess**.
| [6] LocalTaskJobProcess logic is described by :class:`~airflow.jobs.local_task_job.LocalTaskJob` class. It starts new process using TaskRunner.
| [7][8] Process **RawTaskProcess** and **LocalTaskJobProcess** is stopped when they have finished their work.
| [10][12] **WorkerChildProcess** notifies the main process - **WorkerProcess** about the end of the task and the availability of subsequent tasks.
| [11] **WorkerProcess** saves status information in **ResultBackend**.
| [13] When **SchedulerProcess** asks **ResultBackend** again about the status, it will get information about the status of the task.

Queues
------

Expand Down
Binary file added docs/img/run_task_on_celery_executor.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
77 changes: 77 additions & 0 deletions docs/img/run_task_on_celery_executor.puml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/**
* This file contains source code of run_task_on_celery_executor.png image.
*
* If you want regenerate this image, you should follow instructions here:
* https://plantuml.com/starting
*/

@startuml
autonumber

box Scheduler
participant SchedulerProcess order 10
endbox
database QueueBroker order 20
database ResultBackend order 30
box Worker
participant WorkerProcess order 40
participant WorkerChildProcess order 50
participant LocalTaskJobProcess order 60
participant RawTaskProcess order 70
endbox

activate SchedulerProcess
activate WorkerChildProcess

SchedulerProcess->>QueueBroker: Send task
activate QueueBroker
SchedulerProcess->ResultBackend: Pool celery \ntask state
deactivate SchedulerProcess
WorkerChildProcess->QueueBroker: Pool task
QueueBroker->WorkerChildProcess: Send task
deactivate QueueBroker
activate WorkerChildProcess
create LocalTaskJobProcess
WorkerChildProcess->LocalTaskJobProcess: Start process
deactivate
create RawTaskProcess
activate LocalTaskJobProcess
LocalTaskJobProcess->RawTaskProcess: Start process
deactivate LocalTaskJobProcess
activate RawTaskProcess
RawTaskProcess->RawTaskProcess: Execute user code
RawTaskProcess-->LocalTaskJobProcess: Finish process
destroy RawTaskProcess
activate LocalTaskJobProcess
LocalTaskJobProcess-->WorkerChildProcess: Finish process
destroy LocalTaskJobProcess
activate WorkerChildProcess
WorkerChildProcess-->WorkerProcess: Report task result
deactivate WorkerChildProcess
activate WorkerProcess
WorkerProcess-->ResultBackend: Save Celery task state
deactivate WorkerProcess
activate ResultBackend
ResultBackend-->SchedulerProcess: Send celery task state
deactivate ResultBackend

@enduml