Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spawn Dask scheduler on separate Kubernetes pod #84

Closed
chrish42 opened this issue Jul 9, 2018 · 30 comments
Closed

Spawn Dask scheduler on separate Kubernetes pod #84

chrish42 opened this issue Jul 9, 2018 · 30 comments

Comments

@chrish42
Copy link
Contributor

chrish42 commented Jul 9, 2018

This feature request is a followup to #82. Having at least the option to spawn the Dask scheduler on a remote Kubernetes pod would enable more use cases for Dask-kubernetes, including at least the "run Jupyter on my notebook, but do the Dask computation on a remote cluster" one.

@mrocklin
Copy link
Member

mrocklin commented Jul 9, 2018 via email

@jacobtomlinson
Copy link
Member

This is definitely something we would use in anger.

Running Jupyter on a laptop and then connecting to a cloud based cluster for computation would be really useful.

@mrocklin
Copy link
Member

mrocklin commented Jul 10, 2018 via email

@jacobtomlinson
Copy link
Member

@mrocklin I will have to check my schedule and see if I can dedicate time to this.

@chrish42
Copy link
Contributor Author

@mrocklin From my perspective, everyone is 5 minutes (and a credit card) away from having access to a Kubernetes cluster when they outgrow their laptop for computations. Especially if #85 is also implemented. Then they don't need to install anything else but dask-kubernetes; they just instantiate a dask_kubernetes.Cluster object, passing it the cluster credentials given when they created the cluster in the GCP (or other) console. It's getting to a JupyterHub-like setup on Kubernetes, where people can launch notebooks, etc. that requires more work (and/or money), and has potentially a smaller audience.

@jgerardsimcock
Copy link
Contributor

jgerardsimcock commented Aug 3, 2018

We would really like to explore the idea of having there be a central scheduler that users via jupyter notebook/terminal could log into and run large batch jobs. I'd like to understand how to do something like the financial modeling team in the dask use cases

What is the best place to start on getting this moving?

  1. putting the scheduler in another pod (I have no idea how to do this)
  2. ?

@mturok
Copy link

mturok commented Sep 21, 2018

+1

@arpit1997
Copy link

Is it still open? Can I try this?

@jgerardsimcock
if we spawn dask scheduler in another pod and let scheduler manage jobs wouldn't that be enough? Am I missing something here?

@chrish42 Certainly it would have small audience but it would really help the data scientists kind of people to just connect with a cluster and do processing on huge datasets.

@rmccorm4
Copy link

rmccorm4 commented Feb 7, 2019

@mrocklin @jacobtomlinson Am I wrong in thinking that you could create and start the KubeCluster client/scheduler on a master in the Kubernetes cluster, serialize/send the cluster/client object to your laptop, and issue client.submit(func, x) commands from your laptop? (Which will just work with because of the existing dask.distributed functionalities since this client object will have the IP and port of the remote master you started it on?)

Or is this issue saying a whole new RPC setup needs to be developed, where you start some listening socket on a master in the cluster that will take RPCs to initialize dask-kubernetes cluster/client/scheduler, take RPCs to submit tasks, take RPCs to scale, etc.?

Or neither? I'm just trying to better understand the problem statement here.

@mrocklin
Copy link
Member

mrocklin commented Feb 7, 2019

You could do something like that, yes, but it's a bit reversed of what I suspect will be a common workflow.

The situation we care about is that someone has a laptop and they have a kubernetes cluster. From their laptop they want to ask Kubernetes to create a Dask scheduler and a bunch of Dask workers that talk to the scheduler, and then they create a client locally on their laptop and connect to that scheduler.

Currently with the way KubeCluster is designed the scheduler is only created locally, so the user has to be on the Kubernetes cluster. So instead of something like this.

class KubeCluster:
    def start(...)
        self.scheduler = Scheduler(...)  # actually this happens inside of the `self.cluster = LocalCluster(...)` today

We want something like this

class KubeCluster:
    def start(...):
        self.scheduler_pod = kubernetes.create_pod(scheduler_pod_specification)
        self.scheduler_connection = connect_to(scheduler_pod)

Or at least we want the option to do that. There will be other cases where we'll want the old behavior as well.

@martindurant
Copy link
Member

Doesn't dask-yarn work with a scheduler-in-a-container?

@mrocklin
Copy link
Member

mrocklin commented Mar 6, 2019 via email

@dhirschfeld
Copy link

I think this refactor would be fairly tightly coupled to whatever results from dask/distributed#2235?

@beberg
Copy link

beberg commented Mar 8, 2019

Now that the Helm chart updates are in progress in #128 we have a better handle on the use cases and variables needed to put the scheduler in cluster.

  • Add an optional scheduler spec to KubeCluster.
  • Add additional parameters~1:1 with the Helm chart variables.
  • Usage may be tricky to keep 100% backward compatible, but can still default to local scheduler.
  • The kwargs approach doesnt scale great. The YAML defaults + YAML overrides may be an easier and clearer UX, matching kubectl, Helm, konfigure, etc in the K8s ecosystem.
  • Depending on the type of cluster and the NodePort/LoadBalancer/ClusterIP, things get messy so this will need some care, similar to how the chart spits out all the needed addresses.

@mrocklin
Copy link
Member

mrocklin commented Mar 8, 2019

We currently ask the user for a worker pod specification template. We should probably expand this to also include a scheduler pod specification template. I agree that expanding everything out as keyword arguments would probably be difficult to maintain.

So I think that all we really need here from an API perspective is to add a scheduler_pod_template= keyword, and then we should be good to go (at least until we actually have to implement things).

@SeguinBe
Copy link

Maybe that will be of some help to others, but we had the situation where we wanted to have the client on our own machines and just dispatch dask workers/scheduler to kubernetes. So I made a tiny library to do it, it is not very well tested but it works on our University settings.

https://github.com/SeguinBe/dask_k8

@jacobtomlinson
Copy link
Member

This has now been implemented in #162 and can be enabled with cluster = KubeCluster(deploy_mode="remote", ...).

This created the scheduler in a pod on the cluster and also creates a service to enable traffic to be routed to the pod. This is a ClusterIP service by default but can be configured to use a LoadBalancer.

This is currently not the default as it is non-trivial to route traffic to the dashboard using this setup.

Further testing and development would be great!

@wwoods
Copy link

wwoods commented Nov 13, 2019

I may not be following this thread correctly, but running dask_kubernetes.KubeCluster.from_yaml(..., deploy_mode="remote") is still resulting in a hanging client when I run it on my laptop (though the scheduler does appear to run in its own pod). What additional routing/configuration is needed to have a local laptop installation talk to a dask_kubernetes cluster spawned via from_yaml? dask_kubernetes version 0.10.0

@jacobtomlinson
Copy link
Member

@wwoods the scheduler will be placed behind a service, by default this will be of type LoadBalancer. The client will then attempt to connect to the address of the service.

So if you leave the defaults then your kubernetes cluster must be able to assign a Load Balancer and your client must be able to connect to it. If you directly have access to the internal kubernetes network then I recommend setting the service type to ClusterIP which means the client will connect directly to the kubernetes service, but you must be able to route to it.

Without knowing more about you kubernetes cluster setup I can't provide more advice.

@wwoods
Copy link

wwoods commented Nov 13, 2019

@jacobtomlinson Just trying to get this working in a "kind" context (local cluster), from scratch. It looks like the service is configured as ClusterIP... Here's the error:

Timed out trying to connect to 'tcp://dask-waltw-bfc87757-2.default:8786' after 10 s: [Errno -2] Name or service not known

And here's the output of kubectl get service:

dask-waltw-bfc87757-2 ClusterIP 10.111.76.46 <none> 8786/TCP,8787/TCP 69s

It looks like the service.namespace which dask-kubernetes is trying to connect to is correct, so perhaps kind hasn't configured the local DNS correctly? Not knowing more about kubernetes, it's hard to say if this is because I'm misusing deploy="remote". From the original use case though, it seems like running a python script locally and doing the dask computation remotely is exactly the use case.

@jacobtomlinson
Copy link
Member

It looks like your local machine can't resolve kubernetes DNS in your local cluster. If you were using a managed cluster with a load balanacer you would be provided with an address you could resolve.

You will need to configure things correctly to enable this. What local implementation are you using? (docker desktop, k3s, minikube, etc)

@wwoods
Copy link

wwoods commented Nov 14, 2019

@jacobtomlinson right - I haven't used kubernetes much, but this particular bit of documentation is proving rather hard to find. I know it's not exactly a dask-kubernetes problem, but I do appreciate you pointing me in the right direction. It's somewhat necessary information to take dask-kubernetes for a test drive.

I was using kind (https://github.com/kubernetes-sigs/kind), but I'm not attached. Also tried minikube since it explicitly mentioned DNS, but no luck.

@jacobtomlinson
Copy link
Member

I'm afraid I don't have experience with kind. I personally use docker desktop on my Mac for local development. We use minikube for CI and testing.

@wwoods
Copy link

wwoods commented Nov 14, 2019 via email

@jacobtomlinson
Copy link
Member

If you have a look at the ci config you can see how it is done there.

@detroyejr
Copy link

And here's the output of kubectl get service:

dask-waltw-bfc87757-2 ClusterIP 10.111.76.46 <none> 8786/TCP,8787/TCP 69s

It looks like the service.namespace which dask-kubernetes is trying to connect to is correct, so perhaps kind hasn't configured the local DNS correctly? Not knowing more about kubernetes, it's hard to say if this is because I'm misusing deploy="remote". From the original use case though, it seems like running a python script locally and doing the dask computation remotely is exactly the use case.

My Kubernetes cluster is provisioned with KOPS which doesn't create a load balancer in AWS by default. I was looking for a good solution yesterday and discovered kubefwd which will add Kubernetes services to your hosts file so that urls like dask-waltw-bfc87757-2:8787 are accessible locally. My limited testing shows that it works, but maybe isn't as stable as a load balancer would be. Hope this helps someone.

@epizut
Copy link

epizut commented Aug 20, 2020

Hi,

I am in the same situation here. I am trying to run a remote scheduler and workers on a managed k8 from my laptop over the internet.

  • The auth part is fine with the recent kubeconfig dev (thanks a lot)
  • kubernetes.scheduler-service-type=LoadBalancer allows me to access the Dashboard where I see my worker(s) (dask.config thanks to Nuno)
  • distributed.comm.timeouts.connect=180 helps too (dask.config)
  • scheduler_service_wait_timeout=180 idem (KubeCluster constructor)
  • But then I got both a remote scheduler exception (on k8 logs):
[...]
distributed.scheduler - INFO -   Scheduler at:   tcp://100.64.1.223:8786
distributed.scheduler - INFO -   dashboard at:                     :8787
tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection. at 0x7f74b62473a0>,  exception=KeyError('pickle-protocol')>)
[...]
  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 222, in on_connection
    comm.handshake_options = comm.handshake_configuration(
  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 138, in handshake_configuration
    "pickle-protocol": min(local["pickle-protocol"], remote["pickle-protocol"])
KeyError: 'pickle-protocol'
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP  local=tcp://100.64.1.223:8786 remote=tcp://x.x.x.x:53488>
distributed.scheduler - INFO - Register worker <Worker 'tcp://100.64.0.56:40725', name: 0, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://100.64.0.56:40725
  • and a KubeCluster exception while computing the ipython output on my laptop kernel:
[...]/cluster.py in __repr__(self)
    390             self._cluster_class_name,
    391             self.scheduler_address,
--> 392             len(self.scheduler_info["workers"]),
    393             sum(w["nthreads"] for w in self.scheduler_info["workers"].values()),
    394         )
KeyError: 'workers'

I believe managed k8 users are more data science people than k8 admins.
So maybe the "LoadBalancer" solution can be a bit more documented?

Also dask-gateway seems overkill for a single user.

What do you think?

@jacobtomlinson
Copy link
Member

That sounds to me like a version mismatch in Dask between the scheduler, workers or client. I'm not sure it is really related to this issue. Could you check all your Dask versions are up to date and if you're still having trouble raise a new issue.

I also agree that dask-gateway is overkill for a single user. That's not really its target audience. We have dask-kubernetes and the Dask helm-chart for single users, and dask-gateway (and soon the new dask-hub helm chart) for teams.

@epizut
Copy link

epizut commented Aug 20, 2020

You are right, It's working for me now.

For the record I had to face:

  • Of course, deploy_mode=remote and scheduler-service-type=LoadBalancer
  • Increase distributed.comm.timeouts.connect (my managed k8 takes about 15s to start the sched pod)
  • Strips dots in username while waiting for the next pypi package removed . from valid characters for pod names #225. The doc points to kubernetes.worker-name but I only had success with kubernetes.name or KubeCluster(name)

Thank again Jacob

@jacobtomlinson
Copy link
Member

I'm going to close this out as this is now the default behaviour.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests