Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6181] Add InProcessExecutor #6740

Merged
merged 7 commits into from Dec 12, 2019

Conversation

turbaszek
Copy link
Member

@turbaszek turbaszek commented Dec 5, 2019

Make sure you have checked all steps below.

Jira

Description

  • Here are some details about my PR, including screenshots of any UI changes:

Together with guys from Databand we created a new executor that is meant to be used mainly for debugging and DAG development purposes. This executor executes single task instance at time and is able to work with SQLite and sensors.

Using this executor you can debug your DAGs from IDE 馃殌
Screenshot 2019-12-05 at 18 42 09
Screenshot 2019-12-05 at 18 54 32

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain docstrings that explain what it does
    • If you implement backwards incompatible changes, please leave a note in the Updating.md so we can assign it to a appropriate release

docs/executor/inprocess.rst Outdated Show resolved Hide resolved
@codecov-io
Copy link

@codecov-io codecov-io commented Dec 5, 2019

Codecov Report

Merging #6740 into master will decrease coverage by 0.06%.
The diff coverage is 90.9%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master   #6740      +/-   ##
=========================================
- Coverage   84.37%   84.3%   -0.07%     
=========================================
  Files         672     673       +1     
  Lines       38214   38350     +136     
=========================================
+ Hits        32242   32330      +88     
- Misses       5972    6020      +48
Impacted Files Coverage 螖
airflow/configuration.py 93.11% <酶> (酶) 猬嗭笍
airflow/executors/base_executor.py 96.05% <100%> (酶) 猬嗭笍
airflow/executors/executor_loader.py 78.94% <100%> (+15.9%) 猬嗭笍
airflow/models/taskinstance.py 93.75% <50%> (-0.27%) 猬囷笍
airflow/executors/debug_executor.py 92.06% <92.06%> (酶)
airflow/kubernetes/volume_mount.py 44.44% <0%> (-55.56%) 猬囷笍
airflow/kubernetes/volume.py 52.94% <0%> (-47.06%) 猬囷笍
airflow/kubernetes/pod_launcher.py 45.25% <0%> (-46.72%) 猬囷笍
airflow/kubernetes/refresh_config.py 50.98% <0%> (-23.53%) 猬囷笍
...rflow/contrib/operators/kubernetes_pod_operator.py 78.2% <0%> (-20.52%) 猬囷笍
... and 10 more

Continue to review full report at Codecov.

Legend - Click here to learn more
螖 = absolute <relative> (impact), 酶 = not affected, ? = missing data
Powered by Codecov. Last update 479ee63...fbae6ac. Read the comment docs.

@KevinYang21
Copy link
Member

@KevinYang21 KevinYang21 commented Dec 5, 2019

Do we expect people to use the production DB together with this executor? Seems to be quite dangerous. If we expect people to setup local meta DB, do we want to call it out and maybe somehow force to use that local meta DB?

@mik-laj
Copy link
Member

@mik-laj mik-laj commented Dec 5, 2019

How do you want to force the local database? In the world of containers it is very difficult to distinguish between a local and a remote database.

@turbaszek
Copy link
Member Author

@turbaszek turbaszek commented Dec 5, 2019

I am against forcing any db. The executor is meant for local, development purposes meaning that there should be no production db to mess with. The way I would expect DAG creators to use it is to go with local environment + sqlite (or any other) or use breeze / other image as their environment.

TESTING.rst Outdated Show resolved Hide resolved
TESTING.rst Outdated Show resolved Hide resolved
airflow/models/taskinstance.py Show resolved Hide resolved
docs/executor/inprocess.rst Outdated Show resolved Hide resolved
docs/executor/inprocess.rst Outdated Show resolved Hide resolved
docs/executor/inprocess.rst Outdated Show resolved Hide resolved
docs/executor/inprocess.rst Outdated Show resolved Hide resolved
docs/executor/inprocess.rst Outdated Show resolved Hide resolved
docs/executor/inprocess.rst Outdated Show resolved Hide resolved
docs/executor/inprocess.rst Outdated Show resolved Hide resolved
@turbaszek turbaszek requested review from feluelle, mik-laj and potiuk Dec 6, 2019
@potiuk
Copy link
Member

@potiuk potiuk commented Dec 7, 2019

Do we expect people to use the production DB together with this executor? Seems to be quite dangerous. If we expect people to setup local meta DB, do we want to call it out and maybe somehow force to use that local meta DB?

I think it's exactly the same case as with SequentialExecutor. I consider the executor choice as actually part of deployment (I.e. some people use LocalExecutor, some Celery some Kubernetes)
So i do not see any danger with this executor

if executor_name in executors:
executor_module = importlib.import_module(executors[executor_name])
executor = getattr(executor_module, executor_name)
return executor()
Copy link
Member

@potiuk potiuk Dec 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :)

Copy link
Contributor

@dimberman dimberman Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. this is muuuch cleaner.

potiuk
potiuk approved these changes Dec 7, 2019
@potiuk
Copy link
Member

@potiuk potiuk commented Dec 7, 2019

Hwey @kaxil @ashb - you might want to take a look. This is the new InProcessExecutor contributed by Databand.ai and perfected by @nuclearpinguin we told you about. It's really great for testing/debugging of DAGs.

@potiuk potiuk added the area:scheduler/executor label Dec 9, 2019
@potiuk
Copy link
Member

@potiuk potiuk commented Dec 9, 2019

I'd go for it. There is low risk it will break anything and I think it is super useful for anyone testing DAGs. Maybe also we should announce it in devlist/slack the there is this new way of running the in-process executor. I would love to cherry-pick all those related changes (pylint & others) to 1.10.7 as soon as possible.

@potiuk
Copy link
Member

@potiuk potiuk commented Dec 9, 2019

TESTING.rst Outdated Show resolved Hide resolved
@ashb
Copy link
Member

@ashb ashb commented Dec 9, 2019

My main thought is where is this useful, that airflow test isn't? I remember reading somewhere about using airflow test as the entrypoint for IDE.

if __name__ == '__main__':
dag.clear(reset_dag_runs=True)
dag.run()
Copy link
Member

@ashb ashb Dec 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there already a dag.cli() or something?

Copy link
Member

@ashb ashb Dec 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is. I would like us to tie in to that command somehow please.

Copy link
Member Author

@turbaszek turbaszek Dec 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dag.cli() does not work as far as I know (planned to fix). I was thinking about adding debug method, so only this has to be done in DAG, no additional configuration of running a file, WDYT?

airflow/models/taskinstance.py Outdated Show resolved Hide resolved
@potiuk
Copy link
Member

@potiuk potiuk commented Dec 9, 2019

My main thought is where is this useful, that airflow test isn't? I remember reading somewhere about using airflow test as the entrypoint for IDE.

In all other executors the main process is running subprocess.popen. That prevents python debuggers from debugging then new process forked as the result. So you cannot set breakpoint and hit "Debug" to get it working. The only way we could find so far was remote debugging, but it requires paid version of IntelliJ and it is rather complex to run.

With this setting you go back to a "sane" way of debugging DAGs -you just add those two lines, set the breakpoint and use "Debug". I am not aware of any simple way of doing with with starting subprocesses. Unless someone knows it (but I have not heard it so far).

Moreover with this we will also be able to debug the code inside Breeze using Docker integration (in the same super-intuitive and easy way). You just add two lines to your DAG, set your environment to point to your Docker image/container and you can use all the debugging features of your IDE out of the box to not only initialize but also execute your DAG. This is super-powerful. We are going to use it at our workshops we have this Friday and this is so much easier for the users to debug the DAGs this way. Even if you use other IDEs which have good debugging integration, it's going to be super easy because you debug it in exactly the same way as you debug other python programs (which means it will just work).

I know @feluelle tried it before but could not make it work with the other executors.

@ashb
Copy link
Member

@ashb ashb commented Dec 9, 2019

airflow test doesn't run an executor -- it runs a task directly.

(sorry for short replies -- will expand on this tomorrow)

airflow/executors/executor_loader.py Outdated Show resolved Hide resolved
sqlite since sqlite do not support multiple connections.
It executes one task instance at time. Additionally to support working
with sensors, sensor's ``mode`` will be automatically set to "reschedule".
Copy link
Member

@ashb ashb Dec 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This requirement/hard-coding doesn't make immediate sense to me.

Suggested change
with sensors, sensor's ``mode`` will be automatically set to "reschedule".
with sensors, all sensors ``mode`` will be automatically set to "reschedule".

Copy link
Member

@potiuk potiuk Dec 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that if sensor depends on another task doing it's work, it will block the executor and will not let the operator to do its job. We had a few examples of those when we worked on GCP operators. Basically - when you want to spin-off both sensor and operator at the same time you need this. Example here:

https://github.com/apache/airflow/blob/master/airflow/gcp/example_dags/example_bigtable.py

Here we have two sensors waiting and they might be fired in any order - they are waiting for two creates (they trigger the sensor's replicate wait to complete). We have more than one sensor and they can fire in any order as they are not depending on each other.

Using "reschedule" mode makes it robust to more strict dependencies set in the DAG.

airflow/executors/inprocess_executor.py Outdated Show resolved Hide resolved
Copy link
Member

@ashb ashb left a comment

I think this should be called something like DebugExecutor -- InProcess makes sense to us as developers of Airflow, but might notbe immedately obvious to users. Plus Debug make is more obvious that you shouldn't be using it in production.

@potiuk
Copy link
Member

@potiuk potiuk commented Dec 9, 2019

airflow test doesn't run an executor -- it runs a task directly.

(sorry for short replies -- will expand on this tomorrow)

'airflow test' only runs one task but then we want to run the whole dag - sometimes when you have a complex DAG you want to run all steps before - and running them manually is not a good idea. you need to run them in the right sequence manually. Being able to run the whole DAG is much more convenient.

airflow/models/taskinstance.py Outdated Show resolved Hide resolved
docs/executor/debug.rst Outdated Show resolved Hide resolved
@turbaszek turbaszek requested a review from potiuk Dec 10, 2019
[debug]
# Used only with DebugExecutor. If set to True DAG will fail with first
# failed task. Helpful for debugging purposes.
fail_fast = False
Copy link
Contributor

@dimberman dimberman Dec 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this

Copy link
Contributor

@dimberman dimberman left a comment

Minor nit but overall this LGTM!

tests/executors/test_inprocess_executor.py Outdated Show resolved Hide resolved
@potiuk
Copy link
Member

@potiuk potiuk commented Dec 12, 2019

I think we should merge it after travis passes :)

@turbaszek turbaszek force-pushed the feature/python_executor branch from e535d94 to fbae6ac Compare Dec 12, 2019
@dimberman
Copy link
Contributor

@dimberman dimberman commented Dec 12, 2019

@nuclearpinguin I'll merge as soon as tests pass :)

@mik-laj mik-laj merged commit fe2334f into apache:master Dec 12, 2019
1 check passed
potiuk pushed a commit that referenced this issue Jan 21, 2020
Adds new executor that is meant to be used mainly
for debugging and DAG development purposes. This
executor executes single task instance at time and
is able to work with SQLLite and sensors.

(cherry picked from commit fe2334f)
potiuk added a commit to potiuk/airflow that referenced this issue Jan 21, 2020
potiuk added a commit to potiuk/airflow that referenced this issue Jan 22, 2020
kaxil pushed a commit that referenced this issue Jan 22, 2020
Adds new executor that is meant to be used mainly
for debugging and DAG development purposes. This
executor executes single task instance at time and
is able to work with SQLLite and sensors.

(cherry picked from commit fe2334f)
kaxil pushed a commit that referenced this issue Jan 23, 2020
Adds new executor that is meant to be used mainly
for debugging and DAG development purposes. This
executor executes single task instance at time and
is able to work with SQLLite and sensors.

(cherry picked from commit fe2334f)
galuszkak pushed a commit to FlyrInc/apache-airflow that referenced this issue Mar 5, 2020
Adds new executor that is meant to be used mainly
for debugging and DAG development purposes. This
executor executes single task instance at time and
is able to work with SQLLite and sensors.
qianyongyu pushed a commit to qianyongyu/airflow that referenced this issue Jun 2, 2020
Adds new executor that is meant to be used mainly
for debugging and DAG development purposes. This
executor executes single task instance at time and
is able to work with SQLLite and sensors.

(cherry picked from commit fe2334f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:scheduler/executor
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants