-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Airflow kubernetes executor #2414
Conversation
@mistercrunch @aoen PTAL. Pretty close to ready (still need to fix the DAG importers/add unit tests). Will also post to mailing list with the design doc |
Codecov Report
@@ Coverage Diff @@
## master #2414 +/- ##
==========================================
- Coverage 75.28% 75.24% -0.04%
==========================================
Files 196 197 +1
Lines 14608 14686 +78
==========================================
+ Hits 10997 11051 +54
- Misses 3611 3635 +24
Continue to review full report at Codecov.
|
/Disclaimer I am not an airflow core contributor First, super exciting! I'm glad airflow on k8s is getting attention and it has the potential to really improve airflow moving forward. That said, there are a lot of nasty bits when trying to safely schedule jobs on kubernetes (the biggest being how do you ensure a job does not get stuck due to many failure modes like running out of resources, volume mounts failing, incorrect configuration, missing secrets and configmaps, etc etc). The happy path of checking the job status for "Failed", "Running", or "Succeeded" is definitely not enough. Due to the many failure modes, I think this PR will explode in size when you try to handle all the cases. (At least, that is what happened at my dayjob when I implemented a KubernetesExecutor). There are other minor issues as well, but they are mainly concerning your implementation and are not fundamental issues (ie: blocking polling, long delay between polling, etc) I'm happy to reach out and help contribute what I can if you are interested (pending work approval), as I think getting a KubernetesExecutor in airflow core would help a lot of people. |
Hi Grant, thank you for looking this over! This implementation is still somewhat immature but It's at least a working POC until I can iterate with feedback from the airflow and kubernetes communities :). The points you're making are valid ones. I am actually meeting with a team of kubernetes committers next Wednesday 10AMPST via video-chat to discuss my progress thusfar and get a better perspective from the kubernetes community. If you are available, do you think you could join in and offer some of your experiences/issues? WRT this effort being potentially labor-intensive. This is my primary project at my day job, so depending on what the k8s guys say next week, if it takes a week or two of heavy lifting it would still pay off in the long run (and probably offer a cool tool to the rest of the kubernetes community). Could you please let me know what other issues you are seeing with my implementation? Would be best to have this as clean/correct as possible before I speak with the k8s guys next week. Thanks! |
I'd definitely be down for joining the videochat and hearing from kubernetes contributors about job scheduling on kubernetes. I bet they have great perspectives that I or you don't have. About contributing actual code, I have to wait a bit until I know more about what I am/am not allowed to open source of our internal KubernetesExecutor code. I'm confident it won't be an issue, but just being extra safe. About the POC code, here are some high level things that probably need to change outside of just handling job failure modes better:
I think those are the big ones, other minor things are mostly related to code nitpicks which don't matter much from a POC perspective. |
@grantnicholas Great! I'll DM you the info.
A) Many airflow users are not necessarily low level developers. We wouldn't want data scientists/application developers to have to think in terms of kubernetes. B) Perhaps there are certain specific settings people can designate (i.e. how large of an instance they'd want a certain task to run on). This might be funky with a lot of the other executors though.
Thank you again Grant, I'll send you that DM shortly |
About 2: How we solved this was repurposing the celery-specific I think in general this is an improvement for airflow since it removes a This still is only static configuration though. If you want truly dynamic configuration you need another mechanism for shared config passing and validation. (We implemented this with row-level locking in Postgres and adding a new config table, but it is ugly). When I say static vs dynamic configuration I mean: Dynamic configuration is useful if you want to do things like:
About 4: And great, I'm excited to talk with you all and learn more about kubernetes and airflow :) |
Hi guys. Looking forward to the KubernetesExecutor (would it work on Openshift? That would be even nicer). Can you make sure to record the video conf somewhere? Apache doesn't like us to be opaque and it might even spark more interest. You could also make it a public conf (we have done so in the past, works fine) and announce on the dev list. |
Hi @bolkedebruin, Thank you for your interest :). Openshift is definitely one system we want to work for (we will have a cinder integration by default which should help that integration). Unfortunately, while the meeting was recorded, the recording software didn't work correctly and we were unable to save the recording. However, we took fairly good notes of the meeting that I will post to the wiki page. I will be presenting this to another section of the kubernetes community soon and will be certain to post a link to the airflow dev mailing list/record the meeting. Do you think that there would be interest among the airflow community for me to propose a more airflow-centric meeting? I didn't want to overstep any bounds by proposing one without consulting any airflow committers or PMCs. |
@dimberman I think there will definitely be more interest. Especially augmented with some use cases to augment the discussion it will make it pretty attractive I guess. Don't worry about us committers. There is no bounds to doing meetups etc. We don't hold such a monopoly (and wouldn't want to). |
self.watcher_queue = watcher_queue | ||
|
||
def run(self): | ||
logging.info("starting watch!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please properly initialise the logger (logging.getLogger(__name__)
) - And I know we don't do this at other places, yet.
@bolkedebruin In that case I'll propose one tomorrow morning on the dev mailing list :). I contacted the kubernetes team and while they did record the meeting, the recording unfortunately didn't work. However, they took fairly detailed notes which I will add as a subpage to the wiki. There were also a few follow-up meetings proposed which I will post to the dev list beforehand and make sure are recorded. WRT the issues you brought up about scalability and the inefficiency of polling, I have modified the executor to take advantage of the kubernetes watch API. This will keep a consistent stream open that will only alert me when events of interest take place on pods I care about. This will reduce network traffic and allow for higher scalability. I'm also running this watcher on a separate thread, which opens up the possibility of multi-threading it if needed. |
Using the watch api is a big win with respect to both performance and correctness so that looks great. WRT running a separate process for watching: there are some failure modes to consider. For example, what happens if the watch process dies but the main process does not. You probably need to monitor the watch process periodically and restart it on failure. Additionally, what happens if the scheduler process dies, misses events, then when it's started up how do you know which events to look at. You can look at the history of all jobs in that namespace but that is very expensive. I need to look more into the watch api, but I know there is a way to specify from what version of a resource you want to start watching. If you could periodically persist the resource version you last watched from then on startup you could watch only from that specified event. Outside of the watch stuff, the big things left (we talked about these already) are failure modes of jobs getting stuck/retrying forever and how best to pass kubernetes configuration to tasks. |
We would be all over this if this got merged. I'm sure there are a number of k8s users out there who would be thrilled by this. |
What do we need to do to get this over the finish line? We've been waiting anxiously for this feature since we saw it in July! Would love to help this get finished out in any way that I can. Same goes for the members of my team. |
Hi @benjigoldberg! So we're actually having a meeting tomorrow to discuss the release strategy/final requirements to get the kubernetes operator/kubernetes executor PR'ed. Are you a part of the airlfow mailing list? I'll be posting the link later today. (If not I can message you directly). |
@dimberman thanks for the updates! Very exciting to hear. I'll check out the mailing list! |
@benjigoldberg So there is a lot of work happening on the executor here that should be merged in within the next day or two. This PR by @grantnicholas will also make it easy to launch the k8s environment. Once we have that set, if you guys could test it out and report back any bugs/feature requests that would be super helpful :) |
@dimberman we'll definitely check it out next week once you guys are ready for us to test it out! |
Hi @dimberman how are you doing on this one? |
@bolkedebruin we are still chugging away at our list of things needed for the first release in this branch. @dimberman can give you more updates, but it would probably be useful to chat about some of the design tradeoffs being made at some point. IE) I know there was an outstanding question about whether using a docker image created/maintained by google [gcr.io/google-containers/git-sync-amd64] in an apache project is OK to do or not (among other questions) |
Hi @bolkedebruin, we have a few open issues but are getting close to having something ready for release. Would you be available for a chat with me (and @grantnicholas or @benjigoldberg if you guys have time) to discuss some of the outstanding issues/what steps we can start taking to prep for a release once these tickets are merged? Thank you! |
self.kube_client = None | ||
super(KubernetesExecutor, self).__init__(parallelism=self.kube_config.parallelism) | ||
|
||
def clear_not_launched_queued_tasks(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is managed already in the scheduler. I would suggest reaping all PODs that have have tasks still in QUEUED state if required
airflow/contrib/kubernetes/pod.py
Outdated
|
||
class Pod: | ||
""" | ||
Represents a kubernetes pod and manages execution of a single pod. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not look like our typical layout of comments (indent?)
from kubernetes.client import V1Pod | ||
from airflow.utils.state import State | ||
import json | ||
import logging |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the updated logging style and imports
self.kube_req_factory = SimplePodRequestFactory() | ||
self._client = kube_client or get_kube_client() | ||
self._watch = watch.Watch() | ||
self.logger = logging.getLogger(__name__) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use self.log
return self._client.read_namespaced_pod(pod.name, pod.namespace) | ||
|
||
def process_status(self, job_id, status): | ||
if status == 'Pending': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use pod_status.XXX for this in order to prevent misspelling and maybe reuse for the future?
provide_context=True, | ||
*args, | ||
**kwargs) | ||
self.logger = logging.getLogger(self.__class__.__name__) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not required. self.log is defined when inheriting from BaseOperator
#23) * Added in executor_config to the task_instance table and the base_operator table * Fix test; bump up number of examples * Fix up comments from PR * Exclude the kubernetes example dag from a test * Fix dict -> KubernetesExecutorConfig * fixed up executor_config comment and type hint
* Small cleanup to address PR comments * Remove use of enum * Change back to 3.4
k8s API errors will now throw airflow exceptions
@Fokko @bolkedebruin I've moved the previously working kubernetes branch to https://github.com/bloomberg/airflow/tree/airflow-kubernetes-executor-pre-rebase and did a full rebase to match up to master. I'll spend the rest of today trying to fix the bugs. |
Thanks @dimberman, I'll continue debugging tomorrow. Cheers! |
@Fokko So I found out why the executor is being non-stop run with the rebase. Basically it has to do with the fact that now instead of always blocking the jobs loop, we only block if the file path queue is empty: seen here and here For some reason, the k8s executor's file path queue is never empty, meaning that the for loop never pauses. I'll continue investigating but I want to pass on whatever I find since our time zones are a bit asynchronous :).
|
Found it! The issue was there weren't enough threads, so if it takes too long to the point where other dags need to be processed it just never sleeps. This setting fixed the logging issue
Now just need to make the tests pass again and we're good to go :) |
(also, max_threads should probably have a different name to signify its usage for DAG processing and this might potentially be a bug) |
Shoot nvm When I switched back to INFO mode the issue is still there. But at least we know that's the general area where the problem exists. |
@dimberman thanks for rebasing, I'll have a look |
Hi @Fokko, did you have any luck? I'm boarding a flight in an hour and won't have wifi so just wanted to touch base before I do :). |
Hi @dimberman, there is still something looping in the code, without any sleep. I still need to check that. Right now I'm configuring persistent volumes for the logs. After the container exists, the logs are gone. |
I'm delighted to tell you all that this has been merged to master. By merging this to master, it is a bit easier for me to patch stuff onto this and I didn't want to put more into this PR since it is far to big already. I've squashed some of the smaller commits together. The kubernetes tests are still failing, but we should fix this before we release RC of Airflow 1.10. I'm looking at you @dimberman ;-) |
Thank you @Fokko! I'm gonna get right on cleaning up the last few bugs and I'm extremely excited to see this come out with 1.10 :D @grantnicholas @benjigoldberg Thank you guy so much for your help and we're almost there :) |
@dimberman !!! so excited! Thanks for all your hardwork everyone, this is such a great set of features. |
Closes apache#2414 from bloomberg:airflow-kubernetes-executor
Dear Airflow maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
A full proposal for this PR can be found here https://github.com/bloomberg/airflow/blob/29694ae9903c4dad3f18fb8eb767c4922dbef2e8/dimberman-KubernetesExecutorProposal-050717-1423-36.pdf
Tests
Commits