# Distributed Computation on Many Machines

## This notebook will not run in Binder!

You can read this notebook on Binder - to run locally it you will need to `$ pip install -r requirements-coiled.txt` (Binder + Dask don't play well).

## Outcomes

- overview of ecosystem for distributed compute in Python in 2022,
- code examples showing how to run Prefect locally, locally async & on a cluster,
- demonstration of a AWS/Dask/Coiled/Prefect stack to distribute compute over a cluster on EC2.


## Why distribute compute over many machines?

There is a limit on the size of a single machine (largest instance on EC2 etc).

Many small machines can be larger (& perhaps cheaper) than the largest single machine.

Modern distributed compute platforms/environments will be fault tolerant to failures of individual workers - a single EC2 instance won't be.

## Ecosystems

Spark:

- accessing Scala code with Python bindings,
- Databricks is a modern way to run Spark.

[Ray](https://docs.ray.io/en/latest/index.html) & [Dask](https://docs.dask.org/en/stable/):

- distributed compute frameworks,
- Ray is C++ with Python bindings, Dask only Python (?),
- DAGs for computation.

Tensorflow & PyTorch:

- multi-GPU training,
- accessing C++ code with Python bindings.

Plus more - Celery, lots of AWS Lambda...


## Our focus

A stack of Dask / Coiled / Prefect / EC2.

Requires two accounts - AWS account, Coiled account - Prefect account is optional. 


## Dask

[documentation](https://docs.dask.org/en/stable/)

Dask is an execution framework - one scheduler is responsible for executing many workers on many tasks.

<center><img src="../assets/dask.png" alt="Drawing" style="width: 600px;"/></center>

While Dask is a core part of this stack (it gives us concurrent computation - both parallelism + async), we will not write any low level Dask (or Dask DataFrame) code.


## Coiled

[documentation](https://docs.coiled.io)

<center><img src="../assets/coiled-architecture.png" alt="Drawing" style="width: 600px;"/></center>

Manages AWS infrastructure for running Dask clusters on EC2:

- turns a `requirements.txt` into a *software environment* - Docker image with `pip install`,


## Prefect

[documentation Prefect 2.0](https://docs.prefect.io/)

Acts as a wrapper around Dask.  Prefect offers more functionality than just Dask execution:

- scheduling,
- monitoring,
- intelligent re-execution of pipelines (aka back-filling).

Prefect 2.0 is currently in beta (not yet production ready) - we will be using Prefect 2.0.


# Prefect & Dask on a Single Machine

Let's start by writing the program from the last exercise of the previous notebook:

In [1]:
%%timeit -n 1 -r 1
!python ../src/naive.py

1min 27s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


Now try with naive Prefect:

In [2]:
%%timeit -n 1 -r 1
!python ../src/naive_dask_prefect.py

21:31:14.692 | INFO    | prefect.engine - Created flow run 'rough-gorilla' for flow 'main'
21:31:14.693 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
21:31:16.475 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status
21:31:18.584 | INFO    | Flow run 'rough-gorilla' -  downloading http://www.nemweb.com.au/Data_Archive/Wholesale_Electricity/MMSDM/2021/MMSDM_2021_01/MMSDM_Historical_Data_SQLLoader/DATA/PUBLIC_DVD_DISPATCH_UNIT_SCADA_202101010000.zip
21:31:18.934 | INFO    | Flow run 'rough-gorilla' - Created task run 'download-ccd6cdb6-0' for task 'download'
21:31:19.355 | INFO    | Flow run 'rough-gorilla' - Submitted task run 'download-ccd6cdb6-0' for execution.
21:31:19.356 | INFO    | Flow run 'rough-gorilla' -  processing http://www.nemweb.com.au/Data_Archive/Wholesale_Electricity/MMSDM/2021/MMSDM_2021_01/MMSDM_Historical_Data_SQLLoader/DATA/PUBLIC_DVD_DISPATCH_UNIT_SCA

Now let's use Prefect with `asyncio`:

In [3]:
%%timeit -n 1 -r 1
!python ../src/async_prefect.py

21:31:56.419 | INFO    | prefect.engine - Created flow run 'optimal-goose' for flow 'main'
21:31:56.420 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
21:31:58.546 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status
21:32:00.722 | INFO    | Flow run 'optimal-goose' -  downloading http://www.nemweb.com.au/Data_Archive/Wholesale_Electricity/MMSDM/2021/MMSDM_2021_01/MMSDM_Historical_Data_SQLLoader/DATA/PUBLIC_DVD_DISPATCH_UNIT_SCADA_202101010000.zip
21:32:01.030 | INFO    | Flow run 'optimal-goose' - Created task run 'download-ccd6cdb6-0' for task 'download'
21:32:01.512 | INFO    | Flow run 'optimal-goose' - Submitted task run 'download-ccd6cdb6-0' for execution.
21:32:01.512 | INFO    | Flow run 'optimal-goose' -  processing http://www.nemweb.com.au/Data_Archive/Wholesale_Electricity/MMSDM/2021/MMSDM_2021_01/MMSDM_Historical_Data_SQLLoader/DATA/PUBLIC_DVD_DISPATCH_UNIT_SCA

# Prefect & Dask Running on a Coiled Cluster (Many Machines)

<center><img src="../assets/many-machine/f2.png" alt="Drawing" style="width: 600px;"/></center>

Requires a few accounts to get setup:

- AWS account - cluster will run on EC2,
- Coiled account - adds & manages AWS infrastructure needed for a Dask cluster.

Stack:

- EC2,
- Dask,
- Prefect,
- Coiled.

Example of running on a Coiled cluster:

In [4]:
%%timeit -n 1 -r 1
!python ../src/dask_coiled_prefect.py

Found existing software environment build, returning
21:32:38.044 | INFO    | prefect.engine - Created flow run 'complex-urchin' for flow 'main'
21:32:38.045 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `coiled._beta.cluster.ClusterBeta`
21:32:49.927 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://54.234.237.186:8787
21:32:52.497 | INFO    | Flow run 'complex-urchin' - Created task run 'download-ccd6cdb6-0' for task 'download'
21:32:52.824 | INFO    | Flow run 'complex-urchin' - Submitted task run 'download-ccd6cdb6-0' for execution.
21:32:53.163 | INFO    | Flow run 'complex-urchin' - Created task run 'process-1cc53e18-0' for task 'process'
21:32:53.265 | INFO    | Flow run 'complex-urchin' - Submitted task run 'process-1cc53e18-0' for execution.
21:32:53.601 | INFO    | Flow run 'complex-urchin' - Created task run 'download-ccd6cdb6-1' for task 'download'
21:32:53.629 | INFO    | Flow run 'complex-urchin' - Submitted task ru

# Setting up the AWS/Dask/Coiled/Prefect stack

## AWS Setup

Pre-requisite is an AWS account.

First setup a new IAM user (below I call this user `coiled`) with programmatic access (key + secret key) - remember to download / copy your credentials to CSV!

We will use this user to manage & run the Coiled cluster on EC2.

Create IAM policies & AWS infrastructure so you can run Dask clusters in your AWS account - see [Coiled AWS setup](https://docs.coiled.io/user_guide/aws-cli.html). 

[Coiled IAM policies](https://docs.coiled.io/user_guide/aws_reference.html) - one is for setting up the IAM user (don't need if you are using credentials with admin access), the other for spinning up new clusters:

- create 2 IAM policies `coiled-setup` & `coiled-ongoing` from JSON,
- attach policies to your IAM user.


## Coiled account setup

[Create a Coiled account](https://cloud.coiled.io/signup) - add your credentials in *Cloud Provider*.

Or do the same thing via the shell - [create Coiled API token here](https://cloud.coiled.io/profile).

<center><img src="../assets/many-machine/f3.png" alt="Drawing" style="width: 600px;"/></center>

```shell
$ pip install coiled
#  use token here
$ coiled login -t $YOURTOKEN
$ coiled setup aws
```

Now you can run the Dask example:

In [None]:
!python ../src/dask_coiled.py

## Optional - Adding Prefect Cloud

Requires a Prefect Cloud account.

<center><img src="../assets/many-machine/f4.png" alt="Drawing" style="width: 600px;"/></center>


```shell
$ prefect cloud workspace set --workspace "adamgreenadgefficiencycom/kiwipycon-tutorial"
$ prefect cloud login -k $YOUR_PREFECT_API_KEY
```