Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Airflow kubernetes executor #2414

Closed
wants to merge 24 commits into from
Closed

[WIP] Airflow kubernetes executor #2414

wants to merge 24 commits into from

Conversation

dimberman
Copy link
Contributor

@dimberman dimberman commented Jul 5, 2017

Dear Airflow maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@dimberman
Copy link
Contributor Author

@mistercrunch @aoen PTAL. Pretty close to ready (still need to fix the DAG importers/add unit tests). Will also post to mailing list with the design doc

@codecov-io
Copy link

codecov-io commented Jul 5, 2017

Codecov Report

Merging #2414 into master will decrease coverage by 0.03%.
The diff coverage is 69.14%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2414      +/-   ##
==========================================
- Coverage   75.28%   75.24%   -0.04%     
==========================================
  Files         196      197       +1     
  Lines       14608    14686      +78     
==========================================
+ Hits        10997    11051      +54     
- Misses       3611     3635      +24
Impacted Files Coverage Δ
airflow/executors/celery_executor.py 85.48% <ø> (ø) ⬆️
airflow/utils/dag_processing.py 88.88% <ø> (ø) ⬆️
airflow/executors/dask_executor.py 2% <0%> (ø) ⬆️
airflow/bin/cli.py 61.9% <100%> (+0.21%) ⬆️
airflow/executors/sequential_executor.py 100% <100%> (ø) ⬆️
airflow/executors/local_executor.py 79% <100%> (ø) ⬆️
airflow/executors/base_executor.py 93.75% <50%> (-1.42%) ⬇️
airflow/jobs.py 82.81% <50%> (-0.08%) ⬇️
airflow/models.py 86.64% <64.44%> (-0.45%) ⬇️
airflow/executors/__init__.py 63.46% <64.7%> (+1.55%) ⬆️
... and 2 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 17d3d1d...66a554c. Read the comment docs.

@grantnicholas
Copy link
Contributor

grantnicholas commented Jul 6, 2017

/Disclaimer I am not an airflow core contributor

First, super exciting! I'm glad airflow on k8s is getting attention and it has the potential to really improve airflow moving forward.

That said, there are a lot of nasty bits when trying to safely schedule jobs on kubernetes (the biggest being how do you ensure a job does not get stuck due to many failure modes like running out of resources, volume mounts failing, incorrect configuration, missing secrets and configmaps, etc etc). The happy path of checking the job status for "Failed", "Running", or "Succeeded" is definitely not enough.

Due to the many failure modes, I think this PR will explode in size when you try to handle all the cases. (At least, that is what happened at my dayjob when I implemented a KubernetesExecutor).
I think the real solution is breaking out all of those failure modes into a separate kubernetes job runner library, and then using that library inside the KubernetesExecutor.

There are other minor issues as well, but they are mainly concerning your implementation and are not fundamental issues (ie: blocking polling, long delay between polling, etc)

I'm happy to reach out and help contribute what I can if you are interested (pending work approval), as I think getting a KubernetesExecutor in airflow core would help a lot of people.

@dimberman
Copy link
Contributor Author

@grantnicholas

Hi Grant, thank you for looking this over! This implementation is still somewhat immature but It's at least a working POC until I can iterate with feedback from the airflow and kubernetes communities :).

The points you're making are valid ones. I am actually meeting with a team of kubernetes committers next Wednesday 10AMPST via video-chat to discuss my progress thusfar and get a better perspective from the kubernetes community. If you are available, do you think you could join in and offer some of your experiences/issues?

WRT this effort being potentially labor-intensive. This is my primary project at my day job, so depending on what the k8s guys say next week, if it takes a week or two of heavy lifting it would still pay off in the long run (and probably offer a cool tool to the rest of the kubernetes community).

Could you please let me know what other issues you are seeing with my implementation? Would be best to have this as clean/correct as possible before I speak with the k8s guys next week.

Thanks!

@grantnicholas
Copy link
Contributor

grantnicholas commented Jul 7, 2017

I'd definitely be down for joining the videochat and hearing from kubernetes contributors about job scheduling on kubernetes. I bet they have great perspectives that I or you don't have.

About contributing actual code, I have to wait a bit until I know more about what I am/am not allowed to open source of our internal KubernetesExecutor code. I'm confident it won't be an issue, but just being extra safe.

About the POC code, here are some high level things that probably need to change outside of just handling job failure modes better:

  1. More robust way to scale polling for jobstatus. The three main options you have here are:
    a. ThreadPoolExecutor running polling for each job in a separate thread (with the caveat of the GIL)
    b. ProcessPoolExecutor running polling for each job in a separate process (more overhead)
    c. Use an eventloop (from tornado or asyncio) and run async polling inside the eventloop
  2. How much do you want to let people tweak the kubernetes jobs the scheduler launches? In the completely general case, you can dynamically spin up anything from a user-provided config, but you probably don't want this. For me, being able to tweak resource utilization and the docker image that is running are crucial, but for some people always using an airflow-slave image is fine. Additionally, do you want to be able to dynamically change configuration at runtime or just just when the dag is constructed.
  3. How logging is handled. This is actually very easy if you have a readwritemany persistent volume (ie: something like glusterfs). The airflow workers can just write to the gluster volume that is mounted locally and the web UI will pick up the logs magically.
  4. You probably don't need a task_counter. Airflow has a built in config parameter parallelism that will automatically rate-limit the number of running task instances (see the BaseExecutor for the implementation)

I think those are the big ones, other minor things are mostly related to code nitpicks which don't matter much from a POC perspective.

@dimberman
Copy link
Contributor Author

@grantnicholas Great! I'll DM you the info.

  1. Definitely agreed that we can split the queries into separate threads that have their own timers. I would want to be careful about pummeling the kubernetes API (Though I guess we could allow the user to decide how often they want to query).

  2. That's a good question. Would we want the user to decide how big they want the instances to be on a per-task basis? There's two trains of thought that I have on this:

A) Many airflow users are not necessarily low level developers. We wouldn't want data scientists/application developers to have to think in terms of kubernetes.

B) Perhaps there are certain specific settings people can designate (i.e. how large of an instance they'd want a certain task to run on). This might be funky with a lot of the other executors though.

  1. There are a few members of the airflow team that are currently taking care of logging. I'll check in with them to see what's going on there.

  2. The task counter was created to guarantee that each job has a fully unique id (otherwise kubernetes won't launch the job. might not be needed anymore though...

Thank you again Grant, I'll send you that DM shortly

@grantnicholas
Copy link
Contributor

grantnicholas commented Jul 7, 2017

About 2:
I think we're both on the same page that providing configuration on a per task basis is a must.
And I agree that having KubernetesExecutor specific columns in the TaskInstance table is bad too.

How we solved this was repurposing the celery-specific queue field from a varchar(N) to a TEXT/varchar(MAX) field and using it to populate a json config object that can be used to configure your executor's run_async command on a per-task basis. For us, that means configuring the docker image and the resources to run the task with.

I think in general this is an improvement for airflow since it removes a CeleryExecutor specific feature and replaces it with a general configuration field for all executors.

This still is only static configuration though. If you want truly dynamic configuration you need another mechanism for shared config passing and validation. (We implemented this with row-level locking in Postgres and adding a new config table, but it is ugly).

When I say static vs dynamic configuration I mean:
static configuration => configuration that you know about when you are writing your dag.
dynamic configuration => configuration that you know about only when the dag is part-finished

Dynamic configuration is useful if you want to do things like:

  • Have a task that counts the number of rows in a hive table
  • Based on that count, configure a downstream task to have more/less resources

About 4:
Makes sense. Though if you don't care about readability of IDs you can just use a UUID and call it a day.

And great, I'm excited to talk with you all and learn more about kubernetes and airflow :)

@bolkedebruin
Copy link
Contributor

Hi guys. Looking forward to the KubernetesExecutor (would it work on Openshift? That would be even nicer). Can you make sure to record the video conf somewhere? Apache doesn't like us to be opaque and it might even spark more interest. You could also make it a public conf (we have done so in the past, works fine) and announce on the dev list.

@dimberman
Copy link
Contributor Author

Hi @bolkedebruin,

Thank you for your interest :). Openshift is definitely one system we want to work for (we will have a cinder integration by default which should help that integration).

Unfortunately, while the meeting was recorded, the recording software didn't work correctly and we were unable to save the recording. However, we took fairly good notes of the meeting that I will post to the wiki page. I will be presenting this to another section of the kubernetes community soon and will be certain to post a link to the airflow dev mailing list/record the meeting.

Do you think that there would be interest among the airflow community for me to propose a more airflow-centric meeting? I didn't want to overstep any bounds by proposing one without consulting any airflow committers or PMCs.

@bolkedebruin
Copy link
Contributor

@dimberman I think there will definitely be more interest. Especially augmented with some use cases to augment the discussion it will make it pretty attractive I guess. Don't worry about us committers. There is no bounds to doing meetups etc. We don't hold such a monopoly (and wouldn't want to).

self.watcher_queue = watcher_queue

def run(self):
logging.info("starting watch!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please properly initialise the logger (logging.getLogger(__name__) ) - And I know we don't do this at other places, yet.

@dimberman
Copy link
Contributor Author

@bolkedebruin In that case I'll propose one tomorrow morning on the dev mailing list :).

I contacted the kubernetes team and while they did record the meeting, the recording unfortunately didn't work. However, they took fairly detailed notes which I will add as a subpage to the wiki. There were also a few follow-up meetings proposed which I will post to the dev list beforehand and make sure are recorded.

@grantnicholas

WRT the issues you brought up about scalability and the inefficiency of polling, I have modified the executor to take advantage of the kubernetes watch API. This will keep a consistent stream open that will only alert me when events of interest take place on pods I care about. This will reduce network traffic and allow for higher scalability.

I'm also running this watcher on a separate thread, which opens up the possibility of multi-threading it if needed.

@grantnicholas
Copy link
Contributor

@dimberman

Using the watch api is a big win with respect to both performance and correctness so that looks great.

WRT running a separate process for watching: there are some failure modes to consider.

For example, what happens if the watch process dies but the main process does not. You probably need to monitor the watch process periodically and restart it on failure.

Additionally, what happens if the scheduler process dies, misses events, then when it's started up how do you know which events to look at. You can look at the history of all jobs in that namespace but that is very expensive. I need to look more into the watch api, but I know there is a way to specify from what version of a resource you want to start watching. If you could periodically persist the resource version you last watched from then on startup you could watch only from that specified event.

Outside of the watch stuff, the big things left (we talked about these already) are failure modes of jobs getting stuck/retrying forever and how best to pass kubernetes configuration to tasks.

@benjigoldberg
Copy link
Contributor

We would be all over this if this got merged. I'm sure there are a number of k8s users out there who would be thrilled by this.

@benjigoldberg
Copy link
Contributor

What do we need to do to get this over the finish line? We've been waiting anxiously for this feature since we saw it in July! Would love to help this get finished out in any way that I can. Same goes for the members of my team.

@dimberman
Copy link
Contributor Author

Hi @benjigoldberg! So we're actually having a meeting tomorrow to discuss the release strategy/final requirements to get the kubernetes operator/kubernetes executor PR'ed. Are you a part of the airlfow mailing list? I'll be posting the link later today. (If not I can message you directly).

@benjigoldberg
Copy link
Contributor

@dimberman thanks for the updates! Very exciting to hear. I'll check out the mailing list!

@dimberman
Copy link
Contributor Author

@benjigoldberg So there is a lot of work happening on the executor here that should be merged in within the next day or two. This PR by @grantnicholas will also make it easy to launch the k8s environment. Once we have that set, if you guys could test it out and report back any bugs/feature requests that would be super helpful :)

@benjigoldberg
Copy link
Contributor

@dimberman we'll definitely check it out next week once you guys are ready for us to test it out!

@bolkedebruin
Copy link
Contributor

Hi @dimberman how are you doing on this one?

@grantnicholas
Copy link
Contributor

grantnicholas commented Oct 23, 2017

@bolkedebruin we are still chugging away at our list of things needed for the first release in this branch.

@dimberman can give you more updates, but it would probably be useful to chat about some of the design tradeoffs being made at some point. IE) I know there was an outstanding question about whether using a docker image created/maintained by google [gcr.io/google-containers/git-sync-amd64] in an apache project is OK to do or not (among other questions)

@dimberman
Copy link
Contributor Author

Hi @bolkedebruin, we have a few open issues but are getting close to having something ready for release. Would you be available for a chat with me (and @grantnicholas or @benjigoldberg if you guys have time) to discuss some of the outstanding issues/what steps we can start taking to prep for a release once these tickets are merged? Thank you!

self.kube_client = None
super(KubernetesExecutor, self).__init__(parallelism=self.kube_config.parallelism)

def clear_not_launched_queued_tasks(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is managed already in the scheduler. I would suggest reaping all PODs that have have tasks still in QUEUED state if required


class Pod:
"""
Represents a kubernetes pod and manages execution of a single pod.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not look like our typical layout of comments (indent?)

from kubernetes.client import V1Pod
from airflow.utils.state import State
import json
import logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the updated logging style and imports

self.kube_req_factory = SimplePodRequestFactory()
self._client = kube_client or get_kube_client()
self._watch = watch.Watch()
self.logger = logging.getLogger(__name__)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use self.log

return self._client.read_namespaced_pod(pod.name, pod.namespace)

def process_status(self, job_id, status):
if status == 'Pending':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use pod_status.XXX for this in order to prevent misspelling and maybe reuse for the future?

provide_context=True,
*args,
**kwargs)
self.logger = logging.getLogger(self.__class__.__name__)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required. self.log is defined when inheriting from BaseOperator

grantnicholas and others added 5 commits April 19, 2018 11:23
#23)

* Added in executor_config to the task_instance table and the base_operator table

* Fix test; bump up number of examples

* Fix up comments from PR

* Exclude the kubernetes example dag from a test

* Fix dict -> KubernetesExecutorConfig

* fixed up executor_config comment and type hint
* Small cleanup to address PR comments

* Remove use of enum

* Change back to 3.4
k8s API errors will now throw airflow exceptions
@dimberman
Copy link
Contributor Author

@Fokko @bolkedebruin I've moved the previously working kubernetes branch to https://github.com/bloomberg/airflow/tree/airflow-kubernetes-executor-pre-rebase and did a full rebase to match up to master. I'll spend the rest of today trying to fix the bugs.

@Fokko
Copy link
Contributor

Fokko commented Apr 19, 2018

Thanks @dimberman, I'll continue debugging tomorrow. Cheers!

@dimberman
Copy link
Contributor Author

dimberman commented Apr 19, 2018

@Fokko So I found out why the executor is being non-stop run with the rebase. Basically it has to do with the fact that now instead of always blocking the jobs loop, we only block if the file path queue is empty: seen here and here

For some reason, the k8s executor's file path queue is never empty, meaning that the for loop never pauses. I'll continue investigating but I want to pass on whatever I find since our time zones are a bit asynchronous :).

[2018-04-19 23:16:34,401] {dag_processing.py:509} INFO - xxx: file path queue [u'/root/airflow/dags/example_xcom.py', u'/root/airflow/dags/example_skip_dag.py', u'/root/airflow/dags/subdags/subdag.py']
[2018-04-19 23:16:34,404] {dag_processing.py:509} INFO - xxx: file path queue [u'/root/airflow/dags/example_xcom.py', u'/root/airflow/dags/example_skip_dag.py', u'/root/airflow/dags/subdags/subdag.py']
[2018-04-19 23:16:34,408] {dag_processing.py:509} INFO - xxx: file path queue [u'/root/airflow/dags/example_xcom.py', u'/root/airflow/dags/example_skip_dag.py', u'/root/airflow/dags/subdags/subdag.py']
[2018-04-19 23:16:34,411] {dag_processing.py:509} INFO - xxx: file path queue [u'/root/airflow/dags/example_xcom.py', u'/root/airflow/dags/example_skip_dag.py', u'/root/airflow/dags/subdags/subdag.py']
[2018-04-19 23:16:34,414] {dag_processing.py:509} INFO - xxx: file path queue [u'/root/airflow/dags/example_xcom.py', u'/root/airflow/dags/example_skip_dag.py', u'/root/airflow/dags/subdags/subdag.py']
[2018-04-19 23:16:34,417] {dag_processing.py:509} INFO - xxx: file path queue [u'/root/airflow/dags/example_xcom.py', u'/root/airflow/dags/example_skip_dag.py', u'/root/airflow/dags/subdags/subdag.py']
[2018-04-19 23:16:34,421] {dag_processing.py:509} INFO - xxx: file path queue [u'/root/airflow/dags/example_xcom.py', u'/root/airflow/dags/example_skip_dag.py', u'/root/airflow/dags/subdags/subdag.py']
[2018-04-19 23:16:34,425] {dag_processing.py:509} INFO - xxx: file path queue [u'/root/airflow/dags/example_xcom.py', u'/root/airflow/dags/example_skip_dag.py', u'/root/airflow/dags/subdags/subdag.py']
[2018-04-19 23:16:34,428] {dag_processing.py:509} INFO - xxx: file path queue [u'/root/airflow/dags/example_xcom.py', u'/root/airflow/dags/example_skip_dag.py', u'/root/airflow/dags/subdags/subdag.py']
[2018-04-19 23:16:34,432] {dag_processing.py:509} INFO - xxx: file path queue [u'/root/airflow/dags/example_xcom.py', u'/root/airflow/dags/example_skip_dag.py', u'/root/airflow/dags/subdags/subdag.py']
[2018-04-19 23:16:34,435] {dag_processing.py:509} INFO - xxx: file path queue [u'/root/airflow/dags/example_xcom.py', u'/root/airflow/dags/example_skip_dag.py', u'/root/airflow/dags/subdags/subdag.py']
[2018-04-19 23:16:34,438] {dag_processing.py:509} INFO - xxx: file path queue [u'/root/airflow/dags/example_xcom.py', u'/root/airflow/dags/example_skip_dag.py', u'/root/airflow/dags/subdags/subdag.py']
[2018-04-19 23:16:34,442] {dag_processing.py:509} INFO - xxx: file path queue [u'/root/airflow/dags/example_xcom.py', u'/root/airflow/dags/example_skip_dag.py', u'/root/airflow/dags/subdags/subdag.py']

@dimberman
Copy link
Contributor Author

dimberman commented Apr 20, 2018

Found it!

The issue was there weren't enough threads, so if it takes too long to the point where other dags need to be processed it just never sleeps. This setting fixed the logging issue

[scheduler]
    dag_dir_list_interval = 60
    child_process_log_directory = /root/airflow/logs/scheduler
    # Task instances listen for external kill signal (when you clear tasks
    # from the CLI or the UI), this defines the frequency at which they should
    # listen (in seconds).
    job_heartbeat_sec = 2
    max_threads = 16

Now just need to make the tests pass again and we're good to go :)

@dimberman
Copy link
Contributor Author

(also, max_threads should probably have a different name to signify its usage for DAG processing and this might potentially be a bug)

@dimberman
Copy link
Contributor Author

Shoot nvm When I switched back to INFO mode the issue is still there. But at least we know that's the general area where the problem exists.

@Fokko
Copy link
Contributor

Fokko commented Apr 20, 2018

@dimberman thanks for rebasing, I'll have a look

@dimberman
Copy link
Contributor Author

Hi @Fokko, did you have any luck? I'm boarding a flight in an hour and won't have wifi so just wanted to touch base before I do :).

@Fokko
Copy link
Contributor

Fokko commented Apr 20, 2018

Hi @dimberman, there is still something looping in the code, without any sleep. I still need to check that. Right now I'm configuring persistent volumes for the logs. After the container exists, the logs are gone.

@asfgit asfgit closed this in a15b7c5 Apr 22, 2018
@Fokko
Copy link
Contributor

Fokko commented Apr 22, 2018

I'm delighted to tell you all that this has been merged to master. By merging this to master, it is a bit easier for me to patch stuff onto this and I didn't want to put more into this PR since it is far to big already. I've squashed some of the smaller commits together. The kubernetes tests are still failing, but we should fix this before we release RC of Airflow 1.10. I'm looking at you @dimberman ;-)

@dimberman
Copy link
Contributor Author

Thank you @Fokko! I'm gonna get right on cleaning up the last few bugs and I'm extremely excited to see this come out with 1.10 :D

@grantnicholas @benjigoldberg Thank you guy so much for your help and we're almost there :)

@benjigoldberg
Copy link
Contributor

@dimberman !!!

so excited! Thanks for all your hardwork everyone, this is such a great set of features.

aliceabe pushed a commit to aliceabe/incubator-airflow that referenced this pull request Jan 3, 2019
Closes apache#2414 from bloomberg:airflow-kubernetes-executor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet