Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SpecCluster #162

Merged
merged 64 commits into from
Oct 11, 2019
Merged

Use SpecCluster #162

merged 64 commits into from
Oct 11, 2019

Conversation

jacobtomlinson
Copy link
Member

@jacobtomlinson jacobtomlinson commented Jul 10, 2019

This PR contains a rather large overhaul of the library to use the new SpecCluster. I've taken this opportunity to wrap up changes from other PRs adding asyncio support.

Changes:

  • Add classes for Pod, Worker and Scheduler
  • Make the scheduler a pod running on the kubernetes cluster instead of a LocalCluster
  • Base KubeCluster on SpecCluster instead of Cluster
  • Remove unnecessary scaling code from KubeCluster

Currently this relies on changes to SpecCluster in dask/distributed#2827.

Still to do:

  • Pull in all async changes
  • Refactor pod spec creation
  • Tests
  • Docs

@jacobtomlinson
Copy link
Member Author

I'm currently facing issues with the way distributed handles asyncio.

from dask_kubernetes import KubeCluster
cluster = KubeCluster()     
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-4-3751171c0c19> in <module>
----> 1 cluster = KubeCluster()

~/Projects/dask/dask-kubernetes/dask_kubernetes/core.py in __init__(self, pod_template, name, namespace, n_workers, host, port, env, auth, **kwargs)
    334         }
    335 
--> 336         super().__init__({}, scheduler, worker, **kwargs)
    337 
    338     @classmethod

~/miniconda3/lib/python3.7/site-packages/distributed/deploy/spec.py in __init__(self, workers, scheduler, worker, asynchronous, loop, silence_logs)
    142         if not self.asynchronous:
    143             self._loop_runner.start()
--> 144             self.sync(self._start)
    145             self.sync(self._correct_state)
    146             self.sync(self._wait_for_workers)

~/miniconda3/lib/python3.7/site-packages/distributed/deploy/cluster.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    243             return future
    244         else:
--> 245             return sync(self.loop, func, *args, **kwargs)

~/miniconda3/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    330             e.wait(10)
    331     if error[0]:
--> 332         six.reraise(*error[0])
    333     else:
    334         return result[0]

~/miniconda3/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/miniconda3/lib/python3.7/site-packages/distributed/utils.py in f()
    315             if callback_timeout is not None:
    316                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 317             result[0] = yield future
    318         except Exception as exc:
    319             error[0] = sys.exc_info()

~/miniconda3/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/miniconda3/lib/python3.7/site-packages/distributed/deploy/spec.py in _start(self)
    156         self._lock = asyncio.Lock()
    157         self.status = "starting"
--> 158         self.scheduler = await self.scheduler
    159         self.status = "running"
    160 

~/Projects/dask/dask-kubernetes/dask_kubernetes/core.py in _()
     46         async def _():
     47             async with self.lock:
---> 48                 await self.start()
     49             return self
     50 

~/Projects/dask/dask-kubernetes/dask_kubernetes/core.py in start(self)
     54         async with async_timeout.timeout(1):
     55             self.pod = await self.core_api.create_namespaced_pod(
---> 56                 self.namespace, self.pod_template
     57             )   
     58         self.status = "running"

~/miniconda3/lib/python3.7/site-packages/kubernetes_asyncio/client/api_client.py in __call_api(self, resource_path, method, path_params, query_params, header_params, body, post_params, files, response_type, auth_settings, _return_http_data_only, collection_formats, _preload_content, _request_timeout)
    164             post_params=post_params, body=body,
    165             _preload_content=_preload_content,
--> 166             _request_timeout=_request_timeout)
    167 
    168         self.last_response = response_data

~/miniconda3/lib/python3.7/site-packages/kubernetes_asyncio/client/rest.py in POST(self, url, headers, query_params, post_params, body, _preload_content, _request_timeout)
    228                                    _preload_content=_preload_content,
    229                                    _request_timeout=_request_timeout,
--> 230                                    body=body))
    231 
    232     async def PUT(self, url, headers=None, query_params=None, post_params=None,

~/miniconda3/lib/python3.7/site-packages/kubernetes_asyncio/client/rest.py in request(self, method, url, query_params, headers, body, post_params, _preload_content, _request_timeout)
    169                 raise ApiException(status=0, reason=msg)
    170 
--> 171         r = await self.pool_manager.request(**args)
    172         if _preload_content:
    173 

~/miniconda3/lib/python3.7/site-packages/aiohttp/client.py in _request(self, method, str_or_url, params, data, json, cookies, headers, skip_auto_headers, auth, allow_redirects, max_redirects, compress, chunked, expect100, raise_for_status, read_until_eof, proxy, proxy_auth, timeout, verify_ssl, fingerprint, ssl_context, ssl, proxy_headers, trace_request_ctx)
    415         timer = tm.timer()
    416         try:
--> 417             with timer:
    418                 while True:
    419                     url, auth_from_url = strip_auth_from_url(url)

~/miniconda3/lib/python3.7/site-packages/aiohttp/helpers.py in __enter__(self)
    566 
    567         if task is None:
--> 568             raise RuntimeError('Timeout context manager should be used '
    569                                'inside a task')
    570 

RuntimeError: Timeout context manager should be used inside a task

Any support would be appreciated. Ping @mrocklin @quasiben

@quasiben
Copy link
Member

I'm not very familiar with async_timeout perhaps you could use another timeout call asyncio.wait_for ? However, that's not usable with a context. I recall there being issues with how kubernetes_asyncio uses aiohttp ?

@jacobtomlinson
Copy link
Member Author

Hmm possibly. It's a bit of a pain that the library is swagger generated, which makes it harder to make changes to.

I think the error I'm getting could be a bit misleading. It's within if task is None:. So the error Timeout context manager should be used inside a task possibly suggests that it is using a timeout when it shouldn't. As opposed to my first assumption which was that a timeout was missing.

@retrry
Copy link

retrry commented Aug 19, 2019

Hey,
what is still needed to be done with this pull request? As I understand this and async changes are required to use dask-kubernetes with dask-labextension.

I should be able to help test it, if there would be a need for that.

@jacobtomlinson
Copy link
Member Author

@retrry see the checklist in the original comment here. There are also a bunch of changes in SpecCluster since this was opened which need to be factored in.

Thanks for your offer to test this out. I'm aiming to finish this in the next few weeks.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm excited to see this!

I know that this is still WIP, but I couldn't hold myself back from writing a bunch of tiny feedback comments :)

dask_kubernetes/tests/test_core.py Outdated Show resolved Hide resolved
dask_kubernetes/tests/test_core.py Outdated Show resolved Hide resolved
dask_kubernetes/tests/test_core.py Outdated Show resolved Hide resolved
dask_kubernetes/tests/test_core.py Outdated Show resolved Hide resolved
dask_kubernetes/core.py Outdated Show resolved Hide resolved
dask_kubernetes/core.py Outdated Show resolved Hide resolved
dask_kubernetes/core.py Show resolved Hide resolved
connect_kwargs: dict
kwargs to be passed to asyncssh connections
kwargs:
TODO Document Scheduler kwargs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we defer users to the dask.distributed.Scheduler docstring?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a neat way of doing this in Python? Or should I just provide a link to the dask docs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, but I would just link, and maybe add an entry in See Also

dask_kubernetes/core.py Outdated Show resolved Hide resolved
dask_kubernetes/core.py Outdated Show resolved Hide resolved
@jacobtomlinson
Copy link
Member Author

Ok I think this is ready for final review and hopefully a merge.

I've done some more manual testing on the Pangeo Binder with the switch to a local scheduler by default and everything seems to be working ok.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments

dask_kubernetes/core.py Outdated Show resolved Hide resolved
dask_kubernetes/core.py Outdated Show resolved Hide resolved
dask_kubernetes/core.py Outdated Show resolved Hide resolved
requirements.txt Outdated Show resolved Hide resolved
@jcrist
Copy link
Member

jcrist commented Oct 9, 2019

There is also a kwarg called local_scheduler which you can set to False for the scheduler to run in a pod on the cluster.

In dask-yarn we call this deploy_mode="local" and deploy_mode="remote", which was chosen since users were familiar with spark which also has a "deploy-mode" flag. I don't think we need to be uniform here in kwarg name, just thought I'd mention it.

@jacobtomlinson
Copy link
Member Author

I don't think we need to be uniform here in kwarg name, just thought I'd mention it.

I'm more than happy to be uniform. Consistency is good! I wasn't 100% happy with local_scheduler anyway.

@mrocklin
Copy link
Member

@jacobtomlinson thoughts on merging?

@jacobtomlinson
Copy link
Member Author

Yep I think I'm ready!

@jacobtomlinson jacobtomlinson merged commit 77bc754 into dask:master Oct 11, 2019
@jacobtomlinson jacobtomlinson deleted the spec-cluster branch October 11, 2019 07:58
@mrocklin
Copy link
Member

mrocklin commented Oct 11, 2019 via email

@jhamman
Copy link
Member

jhamman commented Oct 11, 2019

Nice work @jacobtomlinson. Glad to see this come together!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants