# Monte-Carlo Estimate of $\pi$

We want to estimate the number $\pi$ using a [Monte-Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods) exploiting that the area of a quarter circle of unit radius is $\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centerd at a corner of the unit square is $\pi/4$ as well.  So for N randomly chosen pairs $(x, y)$ with $x\in[0, 1)$ and $y\in[0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimage $\pi \approx 4 \cdot N_{circ} / N$.

[<img src="https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif" 
     width="50%" 
     align=top
     alt="PI monte-carlo estimate">](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)

## Core Lessons

- setting up SLURM (and other jobqueue) clusters
- Scaling clusters
- Adaptive clusters

## Setup for Example

In [None]:
from os.path import exists
from pathlib import Path

home = str(Path.home())
dasklogs = f"{home}/dask-demo-logs"
if not exists(dasklogs):
    os.mkdir(dasklogs)

## Set up a Slurm cluster

We'll create a SLURM cluster and have a look at the job-script used to start workers on the HPC scheduler.

In [None]:
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
import os

cluster = SLURMCluster(
    cores=24,
    processes=2,
    memory="100GB",
    shebang='#!/usr/bin/env bash',
    queue="normal",
    walltime="00:30:00",
    local_directory='/tmp',
    death_timeout="15s",
    interface="ib0",
    log_directory=dasklogs,
    project="boc")

  from distributed.utils import format_bytes, parse_bytes, tmpfile
  from distributed.utils import format_bytes, parse_bytes, tmpfile
  from distributed.utils import parse_bytes


In [None]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e /scratch/dech/2_2020-2021/4_general_dev/dask-demo/logs/dask-worker-%J.err
#SBATCH -o /scratch/dech/2_2020-2021/4_general_dev/dask-demo/logs/dask-worker-%J.out
#SBATCH -p batch
#SBATCH -A boc
#SBATCH -n 1
#SBATCH --cpus-per-task=24
#SBATCH --mem=94G
#SBATCH -t 00:30:00

/home/mfa/dech/.conda/envs/TFbuild/bin/python -m distributed.cli.dask_worker tcp://10.145.6.14:46190 --nthreads 12 --nprocs 2 --memory-limit 46.57GiB --name dummy-name --nanny --death-timeout 15s --local-directory /tmp --protocol tcp://



In [None]:
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: SLURMCluster
Dashboard: http://10.145.6.14:8787/status,

0,1
Dashboard: http://10.145.6.14:8787/status,Workers: 0
Total threads:  0,Total memory:  0 B

0,1
Comm: tcp://10.145.6.14:46190,Workers: 0
Dashboard: http://10.145.6.14:8787/status,Total threads:  0
Started:  Just now,Total memory:  0 B


## Scale the cluster to two nodes

A look at the Dashboard reveals that there are no workers in the clusetr.  Let's start 4 workers (in 2 SLURM jobs).

For the distiction between _workers_ and _jobs_, see [the Dask jobqueue docs](https://jobqueue.dask.org/en/latest/howitworks.html#workers-vs-jobs).

In [None]:
cluster.scale(4)  # scale to 4 _workers_

## The Monte Carlo Method

In [None]:
import dask.array as da
import numpy as np

def calc_pi_mc(size_in_bytes, chunksize_in_bytes=200e6):
    """Calculate PI using a Monte Carlo estimate."""
    
    size = int(size_in_bytes / 8)
    chunksize = int(chunksize_in_bytes / 8)
    
    xy = da.random.uniform(0, 1,
                           size=(size / 2, 2),
                           chunks=(chunksize / 2, 2))
    
    in_circle = ((xy ** 2).sum(axis=-1) < 1)
    pi = 4 * in_circle.mean()

    return pi

def print_pi_stats(size, pi, time_delta, num_workers):
    """Print pi, calculate offset from true value, and print some stats."""
    print(f"{size / 1e9} GB\n"
          f"\tMC pi: {pi : 13.11f}"
          f"\tErr: {abs(pi - np.pi) : 10.3e}\n"
          f"\tWorkers: {num_workers}"
          f"\t\tTime: {time_delta : 7.3f}s")

Task exception was never retrieved
future: <Task finished name='Task-35' coro=<_wrap_awaitable() done, defined at /home/mfa/dech/.conda/envs/TFbuild/lib/python3.8/asyncio/tasks.py:677> exception=RuntimeError('Command exited with non-zero exit code.\nExit code: 1\nCommand:\nsbatch /tmp/tmp58qby4_i.sh\nstdout:\n\nstderr:\nsbatch: error: invalid partition specified: batch\nsbatch: error: Batch job submission failed: Invalid partition name specified\n\n')>
Traceback (most recent call last):
  File "/home/mfa/dech/.conda/envs/TFbuild/lib/python3.8/asyncio/tasks.py", line 684, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/mfa/dech/.conda/envs/TFbuild/lib/python3.8/site-packages/distributed/deploy/spec.py", line 66, in _
    await self.start()
  File "/home/mfa/dech/.local/lib/python3.8/site-packages/dask_jobqueue/core.py", line 324, in start
    out = await self._submit_job(fn)
  File "/home/mfa/dech/.local/lib/python3.8/site-packages/dask_jobqueue/core.py", 

## The actual calculations

We loop over different volumes of double-precision random numbers and estimate $\pi$ as described above.

In [None]:
from time import time, sleep

In [7]:
for size in (1e9 * n for n in (1, 10, 100)):
    
    start = time()
    pi = calc_pi_mc(size).compute()
    elaps = time() - start

    print_pi_stats(size, pi, time_delta=elaps,
                   num_workers=len(cluster.scheduler.workers))

## Scaling the Cluster to twice its size

We increase the number of workers by 2 and the re-run the experiments.

In [None]:
new_num_workers = 2 * len(cluster.scheduler.workers)

print(f"Scaling from {len(cluster.scheduler.workers)} to {new_num_workers} workers.")

cluster.scale(new_num_workers)

sleep(10)

In [None]:
client

## Re-run same experiments with doubled cluster

In [None]:
for size in (1e9 * n for n in (1, 10, 100)):
    
        
    start = time()
    pi = calc_pi_mc(size).compute()
    elaps = time() - start

    print_pi_stats(size, pi,
                   time_delta=elaps,
                   num_workers=len(cluster.scheduler.workers))

## Automatically Scaling the Cluster

We want each calculation to take only a few seconds.  Dask will try to add more workers to the cluster when workloads are high and remove workers when idling.

_**Watch** how the cluster will scale down to the minimum a few seconds after being made adaptive._

In [None]:
ca = cluster.adapt(
    minimum=4, maximum=100);

sleep(4)  # Allow for scale-down

In [None]:
client

## Repeat the calculation from above with larger work loads

(And watch the dash board!)

In [None]:
for size in (n * 1e9 for n in (1, 10, 100, 1000)):
    
    
    start = time()
    pi = calc_pi_mc(size, min(size / 1000, 500e6)).compute()
    elaps = time() - start

    print_pi_stats(size, pi, time_delta=elaps,
                   num_workers=len(cluster.scheduler.workers))
    
    sleep(20)  # allow for scale-down time

## Complete listing of software used here

In [None]:
%pip list

In [None]:
%conda list --explicit