# Welcome to the Calculate Pi Notebook

## Purpose of this notebook

This notebook uses a toy example of calculating Pi to demonstrate two main things:
* How to run code that loops through repeated (but unconnected) executions as an "embarassingly parallel" workload in Dask.
* How to connect to a Domino On-Demand Dask cluster (and how not to) and ensure our Compute Environment setup is correct. (Note that this notebook uses only dask, not dask-ml, so it will not test the dask-ml setup.)

## Contents of this notebook

We will calculate Pi in three different ways. Below are the expected benchmarks for execution time, assuming a Dask cluster with 3 workers and a Hardware Tier selection with 1 core and 4GB RAM for all components.
1) Without using Dask - 50s
2) With Dask on On-Demand cluster nodes - 17s
3) With Dask on "local cluster" - 17s (**see comments in section 3)

Note that at the end of each calculation we reset the kernel to prevent any backend confusion! We recommend to use "Shift + Enter" to run through cells in each section sequentially, instead of running all.

## 1. Solving Pi without using Dask

We will be calculating Pi using the Leibniz formula (https://en.wikipedia.org/wiki/Leibniz_formula_for_%CF%80) with an artificial "sleep" added to simulate the time needed for more taxing operations.

In [1]:
# Define our functions
import time

def pi_single_term(i, sleep_seconds):
    time.sleep(sleep_seconds)
    # denominator is odd
    k = 2*i + 1
    # even index elements are positive
    if i % 2 == 0:
        x = 4/k
    # odd index elements are negative    
    else:
        x = - 4/k
    return x

# compute pi without Dask
def calculate_pi(n_terms = 500, sleep_seconds = 0.1):
    t1 = time.time()
    s = []
    for i in range(n_terms):
        s.append(pi_single_term(i, sleep_seconds))
    pi_value = sum(s)
    print(f"Pi value: {pi_value}")
    t2 = time.time()
    print(f"Total time calculating pi without dask: {round(t2 - t1, 2)} s")
    print(f"Total time expected for sequential calculation: {n_terms * sleep_seconds} s")    

In [2]:
# Execute the pi calculation; we expect it to take about 50 seconds
calculate_pi()

Pi value: 3.139592655589785
Total time calculating pi without dask: 50.06 s
Total time expected for sequential calculation: 50.0 s


In [3]:
# Try changing the number of terms or the "sleep" time per term, and see accuracy and runtime change
#calculate_pi(n_terms = 100, sleep_seconds = 0)

In [None]:
# Reset the kernel to ensure there is no confusion with later sections
# You will see a popup message about the kernel restarting after running this cell
import os
os._exit(0)

## 2. Solving Pi with Dask on On-Demand cluster nodes

While running this section, we recommend exploring the Dask Web UI, especially the Task Stream. This is also where you may see warnings or errors if there are any issues with the dask installation in your Compute Environments, e.g. mismatched versions between the workspace environment and the cluster worker environment.

### Connecting to the cluster

The code snippet below is taken from https://docs.dominodatalab.com/en/5.0.2/reference/dask/Working_with_your_cluster.html, and connects you to the On-Demand Dask cluster configured for your Domino workspace.

In [1]:
from dask.distributed import Client
import os

service_host = os.environ["DASK_SCHEDULER_SERVICE_HOST"]
service_port = os.environ["DASK_SCHEDULER_SERVICE_PORT"]
client = Client(address=f"{service_host}:{service_port}")

# you should now be connected to the cluster
# Dashboard link from the client object is clickable but will not route in Domino
# Use the embedded Dask Web UI tab instead

# You can also expand the "Scheduler Info" in the client object to see some cluster information
client

0,1
Connection method: Direct,
Dashboard: http://dask-622bc4fa4b34ac671074270a-dask-scheduler.domino-compute.svc.cluster.local:8787/status,

0,1
Comm: tcp://10.0.44.9:8786,Workers: 3
Dashboard: http://10.0.44.9:8787/status,Total threads: 3
Started: 10 minutes ago,Total memory: 12.00 GiB

0,1
Comm: tcp://10.0.50.128:3000,Total threads: 1
Dashboard: http://10.0.50.128:8787/status,Memory: 4.00 GiB
Nanny: tcp://10.0.50.128:3001,
Local directory: /tmp/dask-worker-space/worker-nxadeten,Local directory: /tmp/dask-worker-space/worker-nxadeten
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 109.04 MiB,Spilled bytes: 0 B
Read bytes: 285.8823564178521 B,Write bytes: 131.9457029620856 B

0,1
Comm: tcp://10.0.55.253:3000,Total threads: 1
Dashboard: http://10.0.55.253:8787/status,Memory: 4.00 GiB
Nanny: tcp://10.0.55.253:3001,
Local directory: /tmp/dask-worker-space/worker-em6iom_k,Local directory: /tmp/dask-worker-space/worker-em6iom_k
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 0.0%,Last seen: Just now
Memory usage: 108.52 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://10.0.36.116:3000,Total threads: 1
Dashboard: http://10.0.36.116:8787/status,Memory: 4.00 GiB
Nanny: tcp://10.0.36.116:3001,
Local directory: /tmp/dask-worker-space/worker-6endw1pk,Local directory: /tmp/dask-worker-space/worker-6endw1pk
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 108.28 MiB,Spilled bytes: 0 B
Read bytes: 548.8758936104504 B,Write bytes: 1.20 kiB


### Modifying our code
One important feature of our example is that each term can be calculated independently of the others. This makes it a good example of an "embarassingly parallel" workload. There are several ways to do this in Dask; see https://examples.dask.org/applications/embarrassingly-parallel.html for more examples.

In [2]:
# Define our functions
import time

def pi_single_term(i, sleep_seconds):
    time.sleep(sleep_seconds)
    # denominator is odd
    k = 2*i + 1
    # even index elements are positive
    if i % 2 == 0:
        x = 4/k
    # odd index elements are negative    
    else:
        x = - 4/k
    return x

# compute pi with Dask using client.submit
def calculate_pi_dask(client, n_terms = 500, sleep_seconds = 0.1):
    t1 = time.time()
    s=[]
    for i in range(n_terms):
        # note that client.submit returns a "future", not the actual result
        s.append(client.submit(pi_single_term, i, sleep_seconds))
    # client.gather will actually wait for computation to finish and return the results
    pi_value = sum(client.gather(s))
    print(f"Pi value: {pi_value}")
    t2 = time.time()
    print(f"Total time calculating pi with on-demand dask cluster: {round(t2 - t1, 2)} s")
    print(f"Total time expected for sequential calculation: {n_terms * sleep_seconds} s")

In [3]:
# Execute the pi calculation; we expect it to take about 17 seconds
calculate_pi_dask(client)

Pi value: 3.139592655589785
Total time calculating pi with on-demand dask cluster: 16.97 s
Total time expected for sequential calculation: 50.0 s


In [4]:
# Try changing the number of terms or the "sleep" time per term, and see accuracy and runtime change
#calculate_pi_dask(client, n_terms = 100, sleep_seconds = 0)

In [None]:
# Reset the kernel to ensure there is no confusion with later sections
# You will see a popup message about the kernel restarting after running this cell
import os
os._exit(0)

## 3. Solving Pi with Dask on local cluster

Our final method is a demonstration of the "wrong way" to connect to Dask in Domino. The code is exactly the same as the previous example, and will run successfully.

However, the code to initialize the client object will actually create a new "miniature" Dask cluster running only on the local workspace machine, and will not connect you to your On-Demand cluster. To convince yourself this is true, you can observe the Task Stream in the Dask Web UI while running this section; no new tasks will appear. This type of client initialization code is often found in examples and tutorials online, so be aware of the difference between this method and the "correct" method shown in section 2.

Running Dask in this way may also be useful for debugging purposes, or if you are using an earlier version of Domino without On-Demand clusters. You can still get some advantages of running code in parallel with Dask on a single machine, especially if it has many cores. Because this Pi example contains "sleep" commands to artificially increase runtime, it can demonstrate significant speedup even with a single core due to multithreading. Most realistic examples will perform much better when run on a distributed cluster.

### Cleanup note

If you run this example, it will create a `dask-worker-space` folder in your current directory. Feel free to delete this before syncing your code.

In [1]:
from dask.distributed import Client
import os
client = Client(processes=False, threads_per_worker=1,
                n_workers=3, memory_limit='3GB')

# Because this client object connects to a new "local" cluster, it does NOT correspond to the Dask Web UI
# You can still expand the "Scheduler Info" in the client object to see some cluster information
# Note that it will not match the cluster information in the previous section
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://10.0.34.45:8787/status,

0,1
Dashboard: http://10.0.34.45:8787/status,Workers: 3
Total threads: 3,Total memory: 8.38 GiB
Status: running,Using processes: False

0,1
Comm: inproc://10.0.34.45/404/1,Workers: 3
Dashboard: http://10.0.34.45:8787/status,Total threads: 3
Started: Just now,Total memory: 8.38 GiB

0,1
Comm: inproc://10.0.34.45/404/4,Total threads: 1
Dashboard: http://10.0.34.45:43371/status,Memory: 2.79 GiB
Nanny: None,
Local directory: /tmp/Dask-Quick-Start-Domino/dask-worker-space/worker-nbh_vkkq,Local directory: /tmp/Dask-Quick-Start-Domino/dask-worker-space/worker-nbh_vkkq

0,1
Comm: inproc://10.0.34.45/404/5,Total threads: 1
Dashboard: http://10.0.34.45:45765/status,Memory: 2.79 GiB
Nanny: None,
Local directory: /tmp/Dask-Quick-Start-Domino/dask-worker-space/worker-ga1cyg1r,Local directory: /tmp/Dask-Quick-Start-Domino/dask-worker-space/worker-ga1cyg1r

0,1
Comm: inproc://10.0.34.45/404/6,Total threads: 1
Dashboard: http://10.0.34.45:34049/status,Memory: 2.79 GiB
Nanny: None,
Local directory: /tmp/Dask-Quick-Start-Domino/dask-worker-space/worker-fzfn5fe6,Local directory: /tmp/Dask-Quick-Start-Domino/dask-worker-space/worker-fzfn5fe6


In [2]:
# Define our functions
import time

def pi_single_term(i, sleep_seconds):
    time.sleep(sleep_seconds)
    # denominator is odd
    k = 2*i + 1
    # even index elements are positive
    if i % 2 == 0:
        x = 4/k
    # odd index elements are negative    
    else:
        x = - 4/k
    return x

# compute pi with Dask using client.submit
def calculate_pi_dask(client, n_terms = 500, sleep_seconds = 0.1):
    t1 = time.time()
    s=[]
    for i in range(n_terms):
        # note that client.submit returns a "future", not the actual result
        s.append(client.submit(pi_single_term, i, sleep_seconds))
    # client.gather will actually wait for computation to finish and return the results
    pi_value = sum(client.gather(s))
    print(f"Pi value: {pi_value}")
    t2 = time.time()
    print(f"Total time calculating pi with local dask cluster: {round(t2 - t1, 2)} s")
    print(f"Total time expected for sequential calculation: {n_terms * sleep_seconds} s")    

In [3]:
# Execute the pi calculation; we expect it to take about 17 seconds
calculate_pi_dask(client)

Pi value: 3.139592655589785
Total time calculating pi with local dask cluster: 17.35 s
Total time expected for sequential calculation: 50.0 s
