# Distributed models examples

François-David Collin (CNRS, IMAG, Paul-Valéry Montpellier 3
University)  
Ghislain Durif (CNRS, LBMC)  
Monday, August 26, 2024

# Initialization

In [2]:
from ipyparallel import Client

rc = Client()

`rc` is an interable of accessibles computing nodes.

In [3]:
views = rc[:]

In [4]:
views

<DirectView [0, 1, 2, 3,...]>

## Check cluster engines

In [5]:
import platform
platform.node()

'muse190.cluster'

In [6]:
views.apply_sync(platform.node)

['muse191.cluster',
 'muse195.cluster',
 'muse194.cluster',
 'muse193.cluster',
 'muse197.cluster',
 'muse196.cluster',
 'muse198.cluster',
 'muse190.cluster']

## Distributed prime numbers

Let’s revive our functions

In [7]:
import math

def check_prime(n):
    if n % 2 == 0:
        return False
    for i in range(3, int(math.sqrt(n)) + 1, 2):
        if n % i == 0:
            return False
    return True

In [8]:
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

In [9]:
def find_primes(r):
    return list(filter(check_prime,r))

## Peculiarities

You’ll have to -
[`push`](https://ipyparallel.readthedocs.io/en/latest/api/ipyparallel.html#ipyparallel.DirectView.push)
your dependant functions to the engines (`ipyparallel` does push your
main “mapped” function, but not its dependancies) - explicitly import
any required python library to the engines

In [10]:
views.push({'check_prime': check_prime})

<AsyncResult(_push): pending>

In [11]:
with views.sync_imports():
    import math

importing math on engine(s)

### First steps

1.  Complete with the correct
    [`views.map`](https://ipyparallel.readthedocs.io/en/latest/api/ipyparallel.html#ipyparallel.DirectView.map)
    call

``` python
def calculate_primes(N,chunksize):
    return ...
```

1.  Benchmark it for

``` python
N = 5000000
chunksize = int(N/64)
```

In [12]:
def calculate_primes(N,chunksize):
    return views.map_sync(find_primes,chunks(range(1,N),chunksize))

In [13]:
N = 5000000

In [14]:
%timeit -r 1 -n 1 calculate_primes(N,int(N/64))

3.23 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

# (Aside) a network optimization : [broadcast_view](https://ipyparallel.readthedocs.io/en/latest/examples/broadcast/Broadcast%20view.html) (network optimization)

<img src="attachment:image.png" width="500"/>

In [15]:
direct_view = rc.direct_view()
bcast_view = rc.broadcast_view()

In [16]:
%timeit direct_view.apply_sync(lambda: None)

14.4 ms ± 538 μs per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [17]:
%timeit bcast_view.apply_sync(lambda: None)

17.2 ms ± 907 μs per loop (mean ± std. dev. of 7 runs, 100 loops each)

# An embarrasingly parallel example : distributed Monte-Carlo computing of $\pi$

If we sample randomly a bunch of $N$ points in the unity square, and
counts all points $N_I$ verifying the condition

$x^2 + y^2 \le 1$ whichs means they are in the upper right quarter of a
disk.

We have this convergence

$\lim_{N\to\infty} 4\frac{N_I}{N} = \pi$

<img src="attachment:hpp2_0901.png" width="40%" />

### 2. Write the function which :

-   takes a number of estimates `nbr_estimates` as argument
-   samples them in the \[(0,0),(1,1)\] unity square
-   returns the number of points inside the disk quarter

``` python
def estimate_nbr_points_in_quarter_circle(nbr_estimates):
    ...
    return nbr_trials_in_quarter_unit_circle
```

In [18]:
with views.sync_imports():
    import random

importing random on engine(s)

In [19]:
def estimate_nbr_points_in_quarter_circle(nbr_estimates):
    nbr_trials_in_quarter_unit_circle = 0
    for step in range(int(nbr_estimates)):
        x = random.uniform(0, 1)
        y = random.uniform(0, 1)
        is_in_unit_circle = x * x + y * y <= 1.0
        nbr_trials_in_quarter_unit_circle += is_in_unit_circle
    return nbr_trials_in_quarter_unit_circle

In [20]:
4*estimate_nbr_points_in_quarter_circle(1e4)/1e4

3.1604

### 3. Make it distributed

-   Wraps the previous function in
    `python     def calculate_pi_distributed(nnodes,nbr_samples_in_total)         ...         return estimated_pi`
-   `nnodes` will use only `rc[:nnodes]` and split the number of
    estimates for each worker nodes into `nnodes` blocks.
-   Try it on `1e8` samples and benchmark it on 1 to 8 nodes. (use
    [`time`](https://docs.python.org/3/library/time.html#time.time))
-   Plot the performance gain over one node and comment the plot.

In [21]:
def calculate_pi_distributed(nnodes,nbr_samples_in_total):
    dview = rc[:nnodes]
    nbr_samples_per_worker = nbr_samples_in_total / nnodes
    nbr_in_quarter_unit_circles = dview.apply_sync(estimate_nbr_points_in_quarter_circle, \
                                                   nbr_samples_per_worker)
    nbr_jobs = len(nbr_in_quarter_unit_circles)
    return sum(nbr_in_quarter_unit_circles) * 4 / nbr_samples_in_total

In [22]:
calculate_pi_distributed(8,1e7)

3.1418104

In [23]:
import time

N = 1e8
cluster_times = []
pis = []
for nbr_parallel_blocks in range(1,9):
    print(f"With {nbr_parallel_blocks} node(s): ")
    t1 = time.time()
    pi_estimate = calculate_pi_distributed(nbr_parallel_blocks,N)
    total_time = time.time() - t1
    print(f"\tPi estimate : {pi_estimate}")
    print("\tTime : {:.2f}s".format(total_time))
    cluster_times.append(total_time)
    pis.append(pi_estimate)

With 1 node(s): 
    Pi estimate : 3.1415786
    Time : 50.81s
With 2 node(s): 
    Pi estimate : 3.14160288
    Time : 25.79s
With 3 node(s): 
    Pi estimate : 3.14146156
    Time : 17.22s
With 4 node(s): 
    Pi estimate : 3.14155692
    Time : 12.90s
With 5 node(s): 
    Pi estimate : 3.14167804
    Time : 10.32s
With 6 node(s): 
    Pi estimate : 3.14164836
    Time : 8.60s
With 7 node(s): 
    Pi estimate : 3.14175948
    Time : 7.38s
With 8 node(s): 
    Pi estimate : 3.14164264
    Time : 6.45s

In [24]:
import plotly.express as px

speedups_cores = [cluster_times[0]/cluster_times[i] for i in range(8)]
px.line(y=speedups_cores,x=range(1,9),
        labels={"x":"Number of cores",
                "y":"Speedup over 1 core"},
       width=600)

$\Longrightarrow$ We see a near perfect linear scalability.