# Scalable Python Deployments as a Service

### James Bourbeau - Dask Maintainer & Coiled engineer

In this talk we'll:

1. Give a brief overview of Dask
2. Introduce Coiled and what it has to offer
3. Spin up a Dask cluster on AWS with Coiled
4. Create and use a custom software environment on our cluster
5. Answer any questions you have along the way

**Goal for this talk: Have an understanding for what Coiled offers and how to get started.**

# Dask - tl;dr

Dask ([docs](https://docs.dask.org)) is a popular library for parallel and distributed computing in Python. Dask:

- Reuses familiar APIs from the PyData ecosystem like NumPy, Pandas, and Scikit-Learn
- Integrates with many libraries you may already use: XArray, Rapids, XGBoost, etc.
- Works equally well both on a single machine, or running on multiple machines in a cluster
  
<img src="dask-cluster.svg"
     width="60%"
     alt="Dask cluster\">

In [None]:
from dask.distributed import LocalCluster, Client

# Create a Dask cluster on my laptop using local
# processes for the scheduler and workers
cluster = LocalCluster()
cluster

In [None]:
# Connect a client to my local cluster
client = Client(cluster)
client

In [None]:
import dask.dataframe as dd

df = dd.read_csv(
    "s3://coiled-data/higgs/higgs-00.csv",
    blocksize="10 MiB",
    storage_options={"anon": True},
).persist()

df.groupby("labels").missing_energy_magnitude.mean().compute()

This is great &mdash; we can work on larger-than-memory datasets while using familiar APIs with Dask!

The cluster we've been using so far has been running on my laptop. How can I launch a Dask cluster with more computational resources?
There are lots of great open source projects for launching clusters on various kinds of hardware:

- [Dask-Kubernetes](https://kubernetes.dask.org/en/latest/) for deploying Dask using native Kubernetes APIs
- [Dask-Yarn](https://yarn.dask.org/en/latest/) for deploying Dask on YARN clusters
- [Dask-Jobqueue](https://jobqueue.dask.org/en/latest/) for deploying Dask on job queuing systems (e.g. PBS, Slurm, etc.)
- [Dask Cloud Provider](https://cloudprovider.dask.org/en/latest/) for deploying Dask on cloud-based infrastructure (e.g. AWS Fargate, AzureML)
- [Dask-MPI](http://mpi.dask.org/en/latest/) for deploying Dask on existing MPI environments

**However**, using these projects (typically) involves manually setting up infrastructure, e.g. a Kubernetes cluster, and having a deep knowledge of the system the cluster is being launched on, e.g. how to create an AWS IAM role with appropriate levels of permissions.

Additionally, there are lots of features, like software environment management, that these projects don't address.

That's all to say, these projects are great for some people in some situations, but they're not for everyone.

# Coiled

Coiled ([docs](https://docs.coiled.io/)) is a deployment-as-a-service library for scaling Python. Generally, Coiled provides:

- Easily launchable, cloud-based Dask clusters
- Support for managed software environments
- Tools for collaborating and monitoring costs

You can try out our public beta at https://cloud.coiled.io.

Note that **while Coiled is in beta, it's totally free to use**. You will not be charged for any of the clusters you use, and you can use up to 100 running cores concurrently.

To [get started with Coiled](https://docs.coiled.io/user_guide/getting_started.html), install the `coiled` Python package (from conda-forge or PyPI) and then run `coiled login` in your terminal:

```bash
$ conda install -c conda-forge coiled
$ coiled login
```

# Deploying Dask clusters with Coiled

Let's launch our first Dask cluster with Coiled

In [None]:
%%time

import coiled

cluster = coiled.Cluster(n_workers=10)
cluster

In [None]:
client = Client(cluster)
client

☝️ Note that the scheduler (and workers) are running on AWS and I connect to the remote cluster from my laptop with `client = Client(cluster)`

In [None]:
import dask.dataframe as dd

df = dd.read_csv(
    "s3://coiled-data/higgs/higgs-*.csv",
    blocksize="10 MiB",
    storage_options={"anon": True},
).persist()

df.groupby("labels").missing_energy_magnitude.mean().compute()

🎉 Woo, we just did a groupby-aggregation on a Dask cluster on AWS!

## Our first pain point: software environments

Let's perform some further analysis with this dataset by using it to train an XGBoost classification model.

**Warning**: running the cell below will raise an error and we'll talk about this more

In [None]:
from dask_ml.model_selection import train_test_split
import dask_xgboost

# Extract training features and target label
X, y = df.iloc[:, 1:], df["labels"]

# Split full dataset into training and testing datasets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=True, random_state=2)

# Use Dask-XGBoost to train an XGBoost model
params = {
    "objective": "binary:logistic",
    "max_depth": 3,
    "min_child_weight": 0.5,
}
bst = dask_xgboost.train(client, params, X_train, y_train, num_boost_round=3)

We're getting this `ModuleNotFoundError` because `dask_xgboost` isn't installed on the workers for our cluster, so the workers aren't able to run the Dask-XGBoost tasks we're sending to them.

This is one of the first pain points users tend to experience with distributed computing. Now that there is more than one machine involved in our computations, we need to ensure that each machine has the appropriate libraries installed to execute our tasks.

# Creating software environments with Coiled

Coiled supports [building custom software environments](https://docs.coiled.io/user_guide/software_environment_creation.html) using familiar packaging conventions, like conda and pip, that you're probably already using.

In [None]:
%%time

conda = {
    "channels": ["conda-forge"],
    "dependencies": [
        "python=3.8",
        "dask",
        "coiled",
        "dask-ml",
        "dask-xgboost",
    ]
}

coiled.create_software_environment(
    name="dask-xgboost",
    conda=conda,
)

In [None]:
cluster = coiled.Cluster(
    n_workers=10,
    software="dask-xgboost",   # The cluster scheduler and workers will use our new "dask-xgboost" software environment
    worker_cpu=4,
    worker_memory="8 GiB",
)
cluster

In [None]:
client = Client(cluster)
client

Let's run the exact same code again, now that our cluster has `dask_xgboost` installed.

In [None]:
ddf = dd.read_csv(
    "s3://coiled-data/higgs/higgs-*.csv",
    storage_options={"anon": True},
    blocksize="10 MiB",
).persist()

# Extract training features and target label
X, y = ddf.iloc[:, 1:], ddf["labels"]

# Split full dataset into training and testing datasets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=True, random_state=2)

# Use Dask-XGBoost to train an XGBoost model
params = {
    "objective": "binary:logistic",
    "max_depth": 3,
    "min_child_weight": 0.5,
}
bst = dask_xgboost.train(client, params, X_train, y_train, num_boost_round=3)
bst

Hooray, it worked! We were able to create a custom software environment fairly straightforwardly using `coiled.create_software_environment` and then spin up a cluster which uses the new software environment.

In addition to conda packages, you can also specify pip packages to install or a Docker image to use. For example, we could create a software environment with GPU-accelerated libraries:


```python
import coiled

coiled.create_software_environment(
    name="gpu-env",
    container="gpuci/miniconda-cuda:10.2-runtime-ubuntu18.04",
    conda={
        "channels": ["rapidsai", "conda-forge", "defaults"],
        "dependencies": ["dask", "dask-cuda", "xgboost", "pytorch"],
    },
)
```

See the [Software environments documentation](https://docs.coiled.io/user_guide/software_environment_creation.html) for more complete details.

# Collaboration and cost monitoring

You can share your software environments and cluster settings with your friends and colleagues.

In [None]:
!coiled env inspect dask-xgboost

In [None]:
!coiled env inspect necaris/gputest

## Cost tracking

Coiled keeps track of resource usage on a per person and per account basis. This let's you monitor exactly how much you're spending.

![](clusters-table.png)

You can also set usage limits on a per person and per account basis to ensure you don't rack up a large bill:

In [None]:
cluster.scale(1_000)

By default, Coiled clusters automatically shut down after 20 minutes of inactivity. This helps prevent large bills when you accidentally leave a cluster running over the weekend.

# Additional Resources

- Join the Coiled beta at https://cloud.coiled.io
- See the [Coiled docs](https://docs.coiled.io) for more complete information on Coiled. Including:
    - [Launching clusters with GPUs](https://docs.coiled.io/user_guide/gpu.html)
    - [End to end network security](https://docs.coiled.io/user_guide/security.html)
    - ...
- Join the [Coiled community Slack](https://join.slack.com/t/coiled-users/shared_invite/zt-hx1fnr7k-In~Q8ui3XkQfvQon0yN5WQ) to chat about Coiled and Dask, ask questions, and sharing tips with other Coiled users and the Coiled engineering team
- If you run into a bug or have a feature request, please feel free to open an issue on the [Coiled issue tracker](https://github.com/coiled/coiled-issues)

# Thank you all for your time and attention!