Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Understanding how to configure Skein and Dask-Yarn together in YAML #49

Closed
gallamine opened this issue Jan 22, 2019 · 16 comments
Closed

Comments

@gallamine
Copy link

I am trying to sort out how the Skin YAML configuration (described here https://jcrist.github.io/skein/specification.html) matches with the Dask-Yarn YAML configuration (described here http://yarn.dask.org/en/latest/configuration.html).

Specifically I see the Skin has a files configuration to let you distribute files to the workers. It says to place this files section under the master or service section of the configuration. For the dask-yarn configuration there's only scheduler and worker sections shown in the example. Are the dask-yarn scheduler and worker being implicitly placed into a services section of a Skein configuration?

How does the dask-yarn YAML file match up to the Skein YAML file and where would I put the file attributes in my dask-yarn config file?
Thanks!

@jcrist
Copy link
Member

jcrist commented Jan 23, 2019

Currently if yarn.specification is in the configuration, it's used if no other kwargs are passed to the constructor. It's basically the same as calling YarnCluster.from_specification(spec). It expects at least one service named dask.worker (for the worker), and optionally an additional service dask.scheduler (for the scheduler). If dask.scheduler isn't provided, the scheduler will be started locally.

If you wanted to specify files, you might write a specification like:

name: dask
queue: myqueue

services:
  dask.scheduler:
    resources:
      memory: <scheduler-memory>
      vcores: <scheduler-vcores>
    files:
      environment: /path/to/environment
      other-file-name: /other/local/file/path
    script: |
      source environment/bin/activate
      dask-yarn scheduler

  dask.worker:
    instances: 0
    resources:
      memory: <worker-memory>
      vcores: <worker-vcores>
    files:
      environment: /path/to/environment
      other-file-name: /other/local/file/path
    script: |
      source environment/bin/activate
      dask-yarn worker

In the configuration, you'd indent this whole spec under:

yarn:
  specification:
    ...

If all you want is files in the spec, I think we should make this be supported by constructor as well. I think this would look like:

  • A worker_files kwarg, which takes a mapping to forward to the files kwarg in the skein specification.
  • A worker-file and worker-archive option in the cli. We could mirror spark's api here and have the localized name be optionally specified with a # separator (see https://spark.apache.org/docs/latest/running-on-yarn.html#important-notes). I'm not a huge fan of this api, but it is used by other tools and that might be argument enough to use it.
dask-yarn submit \
  --worker-file /local/file/path \
  --worker-file /other/local/file/path#name-in-container

@gallamine
Copy link
Author

Thanks for the quick reply. I wrote this ticket somewhat generically to be helpful to others. But, specifically I'm looking to make sure each of my workers gets a sqlite database when they're started, so your suggestion of adding this to the constructor seems useful.

My yarn.yaml file looks currently like:

yarn:
  name: dask                 # Application name
  queue: myqueue             # Yarn queue to deploy to

  environment: python:///usr/bin/python

  tags: []                   # List of strings to tag applications

  scheduler:                 # Specifications of scheduler container
    log_level: debug
    vcores: 1
    memory: 4GiB

  worker:                   # Specifications of worker containers
    log_level: debug
    vcores: 2
    memory: 8GiB
    count: 2                 # Number of workers to start on initialization
    restarts: -1             # Allowed number of restarts, -1 for unlimited

I want to add /mnt/tmp/data/cached_volume.sqlite3 to each of the workers, the yarn.yaml would then look like:

yarn:
  name: dask                 # Application name
  queue: myqueue             # Yarn queue to deploy to

  environment: python:///usr/bin/python

  tags: []                   # List of strings to tag applications

  configuration:
    name: dask
    queue: myqueue

    services:
      dask.scheduler:
        resources:
          memory: 4GiB
          vcores: 1
        files:
          environment: python:///usr/bin/python
        script: |
          source environment/bin/activate
          dask-yarn scheduler

      dask.worker:
        instances: 2
        resources:
          memory: 8GiB
          vcores: 2
        files:
          environment: python:///usr/bin/python
          'cached_volume.sqlite3':
            source: /mnt/tmp/data
            type: file
            visibility: public
        script: |
          source environment/bin/activate
          dask-yarn worker

  scheduler:                 # Specifications of scheduler container
    log_level: debug
    vcores: 1
    memory: 4GiB

  worker:                   # Specifications of worker containers
    log_level: debug
    vcores: 2
    memory: 8GiB
    count: 2                 # Number of workers to start on initialization
    restarts: -1             # Allowed number of restarts, -1 for unlimited

at this point I assume I can just keep the worker and scheduler section inside the configuration section and drop them from the level up? E.g.:

yarn:
  name: dask                 # Application name
  queue: myqueue             # Yarn queue to deploy to

  environment: python:///usr/bin/python

  tags: []                   # List of strings to tag applications

  configuration:
    name: dask
    queue: myqueue

    services:
      dask.scheduler:
        log_level: debug
        resources:
          memory: 4GiB
          vcores: 1
        files:
          environment: python:///usr/bin/python
        script: |
          source environment/bin/activate
          dask-yarn scheduler

      dask.worker:
        instances: 2
        log_level: debug
        restarts: -1
        resources:
          memory: 8GiB
          vcores: 2
        files:
          environment: python:///usr/bin/python
          'cached_volume.sqlite3':
            source: /mnt/tmp/data
            type: file
            visibility: public
        script: |
          source environment/bin/activate
          dask-yarn worker

Finally, are the script sections you added as examples or are they required for dask-yarn to function?

@jcrist
Copy link
Member

jcrist commented Jan 23, 2019

at this point I assume I can just keep the worker and scheduler section inside the configuration section and drop them from the level up? E.g.:

You'll want specification instead of configuration, and yes. A full skein specification is used when creating a YarnCluster object if no arguments are passed to its constructor (e.g. cluster = YarnCluster()). In this case only the yarn.specification field is read. If parameters are passed to the constructor then yarn.specification is ignored and the other yarn configuration fields are used.

Finally, are the script sections you added as examples or are they required for dask-yarn to function?

This is the bash script used to start the workers/schedulers. In the case above I was assuming you were distributing environments with your application, but from the spec you posted above it looks like you have python installed already on every node. In this case, your spec would probably look like:

yarn:
  specification:
    name: dask
    queue: myqueue

    services:
      dask.scheduler:
        resources:
          memory: 4GiB
          vcores: 1
        script: |
          # Script to start the scheduler. Alternatively dask-yarn scheduler might work if it's on your PATH
          /usr/bin/python -m dask_yarn.scheduler

      dask.worker:
        instances: 2
        max_restarts: -1
        resources:
          memory: 8GiB
          vcores: 2
        files:
          'cached_volume.sqlite3':
            source: /mnt/tmp/data
            type: file
            visibility: public
        script: |
          # Script to start a worker. Alternatively dask-yarn worker might work if it's on your PATH
          /usr/bin/python -m dask_yarn.worker

Does that make sense?


Also, you may run into issues distributing a sqlite database among workers if you're not using it for reading only, as multiple containers may end up sharing the same localized file.

@jcrist
Copy link
Member

jcrist commented Jan 23, 2019

Note that adding this to the configuration is only one option, alternatively you can avoid configuration completely and do this programatically using the YarnCluster.from_specification method:

import skein
import dask_yarn

worker = skein.Service(
    instances=0,
    max_restarts=-1,
    resources=skein.Resources(
        memory='8 GiB',
        vcores=2
    ),
    files={
        'cached_volume.sqlite3': skein.File(
            source='/mnt/tmp/data',
            type='file',
            visibility='public'
        )
    },
    script='/usr/bin/python -m dask_yarn.worker'
)

scheduler = skein.Service(
    resources=skein.Resources(
        memory='4 GiB',
        vcores=1
    ),
    script='/usr/bin/python -m dask_yarn.scheduler'
)

spec = skein.ApplicationSpec(
    name='dask',
    queue='myqueue',
    services={
        'dask.worker': worker,
        'dask.scheduler': scheduler
    }
)

cluster = YarnCluster.from_specification(spec)

@gallamine
Copy link
Author

This is helpful. However, I'm trying various configurations but I can't seem to get programmatic or configuration file to work and not give me an error. I've removed all the file references to try and make it as simple as possible but it still fails.

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/site-packages/dask_yarn/core.py", line 291, in __init__
    self._start_cluster(spec, skein_client)
  File "/usr/local/lib/python2.7/site-packages/dask_yarn/core.py", line 367, in _start_cluster
    scheduler_address = app.kv.wait('dask.scheduler').decode()
  File "/usr/local/lib/python2.7/site-packages/skein/kv.py", line 655, in wait
    event = event_queue.get()
  File "/usr/local/lib/python2.7/site-packages/skein/kv.py", line 281, in get
    raise out
skein.exceptions.ConnectionError: Unable to connect to application

This is raised by this script:

import skein
import dask_yarn

worker = skein.Service(
    instances=0,
    max_restarts=-1,
    resources=skein.Resources(
        memory='8 GiB',
        vcores=2
    ),
    script='/usr/bin/python -m dask_yarn.worker'
)

scheduler = skein.Service(
    resources=skein.Resources(
        memory='4 GiB',
        vcores=1
    ),
    script='/usr/bin/python -m dask_yarn.scheduler'
)

spec = skein.ApplicationSpec(
    name='dask',
    queue='default',
    services={
        'dask.worker': worker,
        'dask.scheduler': scheduler
    }
)

cluster = dask_yarn.YarnCluster.from_specification(spec)

and also calling cluster = YarnCluster() using this configuration in yarn.yaml:

yarn:
  name: dask # Application name
  queue: default             # Yarn queue to deploy to

  environment: python:///usr/bin/python

  tags: []                   # List of strings to tag applications

  specification:
    name: dask
    queue: default
    services:
      dask.scheduler:
        resources:
          memory: 4GiB
          vcores: 1
        script: |
          # Script to start the scheduler. Alternatively dask-yarn scheduler might work if it's on your PATH
          /usr/bin/python -m dask_yarn.scheduler
      dask.worker:
        resources:
          memory: 8GiB
          vcores: 2
        script: |
          # Script to start a worker. Alternatively dask-yarn worker might work if it's on your PATH
          /usr/bin/python -m dask_yarn.worker

@jcrist
Copy link
Member

jcrist commented Jan 23, 2019

It looks like the application is failing to start - can you get the application logs?

yarn logs -applicationId <your-application-id>

A few likely culprits:

@jcrist
Copy link
Member

jcrist commented Jan 23, 2019

Ah, shoot, the command I gave you was wrong. You want /usr/bin/python -m dask_yarn.cli worker and /usr/bin/python -m dask_yarn.cli scheduler. If dask-yarn is on $PATH, you could also just use the CLI commands dask-yarn worker, dask-yarn scheduler.

@gallamine
Copy link
Author

Thanks! Good catch. Just to update for completeness, the direct command for the scheduler and work seem to be dask-yarn services scheduler and dask-yarn services worker.

When I include the file type like :

'/mnt/tmp':
            source: '/mnt/tmp/cached_volume.sqlite3'
            type: file

the cluster starts and stops seemingly without error, but if you look into the application logs you see:

19/01/23 18:15:20 WARN skein.ApplicationMaster: FAILED: dask.worker_0 - Destination must be relative

That might should be an ERROR?

If I provide the file a relative path everything seems to work, but I'm having a hard time locating where the file went. Is there a programmatic way of retrieving that in each worker?

@jcrist
Copy link
Member

jcrist commented Jan 23, 2019

Ah good catch, struggling a bit today on my end :/.

If I provide the file a relative path everything seems to work, but I'm having a hard time locating where the file went. Is there a programmatic way of retrieving that in each worker?

YARN localization can only map resources (files or archives) to paths in the containers working directory. For example:

files:
  local-name.sqlite3: /mnt/tmp/cached_volume.sqlite3

will put the sqlite file at local-name.sqlite3 in the current working directory on startup.

The local container directory could be added to skein.properties, but isn't currently exposed (I'll make an issue for this). This would allow getting the absolute path as os.path.join(skein.properties.container_dir, relative_path_from_container_directory).

That might should be an ERROR?

We should definitely error before submitting, but aren't currently checking specs for this in the python client (will file an issue). I would have expected dask-yarn to raise an error on failure here, you're saying you're not seeing one?

@jcrist
Copy link
Member

jcrist commented Jan 23, 2019

I would have expected dask-yarn to raise an error on failure here, you're saying you're not seeing one?

Actually, on further thought this is only failing repeatedly on worker startup, which isn't something I think we could catch generically on the dask-yarn side (there are many reasons why a worker could fail, not all of them should result in an application failure). This specific error could be caught by jcrist/skein#139 though.

@jcrist jcrist changed the title Understanding how to configure Skin and Dask-Yarn together in YAML Understanding how to configure Skein and Dask-Yarn together in YAML Jan 23, 2019
@gallamine
Copy link
Author

gallamine commented Jan 23, 2019

Great. Here's my final yarn.yaml file that lest me start up a cluster and transfer a file for future reference.

yarn:
  name: dask # Application name
  queue: default             # Yarn queue to deploy to

  environment: python:///usr/bin/python

  tags: []                   # List of strings to tag applications

  specification:
    name: dask
    queue: default
    services:
      dask.scheduler:
        resources:
          memory: 4GiB
          vcores: 1
        script: |
          # Script to start the scheduler. Alternatively dask-yarn scheduler might work if it's on your PATH
          dask-yarn services scheduler
      dask.worker:
        resources:
          memory: 8GiB
          vcores: 2
        files:
          './smallfile.txt':
            source: /home/wcox/smallfile.txt
            type: file
        script: |
          # Script to start a worker. Alternatively dask-yarn worker might work if it's on your PATH
          dask-yarn services worker

@jcrist
Copy link
Member

jcrist commented Jan 23, 2019

Glad you got things working!

One note - when specifying files, by default type is inferred from file extension, so unless you need to configure visibility or set type explicitly you could just pass the source path:

...
        files:
          smallfile.txt: /home/wcox/smallfile.txt
...

Also, I missed this above, but for robustness you'll want to add a depends section to the worker, to ensure workers are always started after the scheduler.

...
      dask.worker:
        depends:
          - dask.scheduler
...

It worked fine above because you're not starting an workers initially, but if you were you may have a delay in startup as the cluster isn't available until the scheduler starts.

I'm having some trouble with large files, but I'll make that a separate issue:

If you can file a reproducible issue in https://github.com/jcrist/skein that would be much appreciated!

@jcrist
Copy link
Member

jcrist commented Jan 23, 2019

I've created a new issue for adding the option to distribute files in the constructor: #50

@RahulJangir2003
Copy link

Note that adding this to the configuration is only one option, alternatively you can avoid configuration completely and do this programatically using the YarnCluster.from_specification method:

import skein
import dask_yarn

worker = skein.Service(
    instances=0,
    max_restarts=-1,
    resources=skein.Resources(
        memory='8 GiB',
        vcores=2
    ),
    files={
        'cached_volume.sqlite3': skein.File(
            source='/mnt/tmp/data',
            type='file',
            visibility='public'
        )
    },
    script='/usr/bin/python -m dask_yarn.worker'
)

scheduler = skein.Service(
    resources=skein.Resources(
        memory='4 GiB',
        vcores=1
    ),
    script='/usr/bin/python -m dask_yarn.scheduler'
)

spec = skein.ApplicationSpec(
    name='dask',
    queue='myqueue',
    services={
        'dask.worker': worker,
        'dask.scheduler': scheduler
    }
)

cluster = YarnCluster.from_specification(spec)

I am getting an error while running this

skein.exceptions.DriverError: Failed to submit application, exception:
Error accessing file:/home/hadoop/.ssh

what should i do?

@RahulJangir2003
Copy link

from config file i am getting error. Every time it says my_env/bin/activate: No such file or directory
i have tried absolute path also like /home/hadoop/... it always give error

@jacobtomlinson
Copy link
Member

@RahulJangir2003 could you open a new issue rather than adding comments to an issue closed three years ago.

@dask dask locked as resolved and limited conversation to collaborators Jun 29, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants