This notebook demonstrates parallelising the calculation of gradients across CPU cores of a single machine as well as AWS using the popular ``Dask`` package.

In [1]:
from pathlib import Path

import numpy as np
from numpy.linalg import inv
import pandas as pd
from scipy.integrate import solve_ivp
import dask.array as da
from dask.distributed import Client, progress
import coiled

In [2]:
import sys
sys.path.append('..')

In [3]:
from utils.parallel import apply_along_axis_parallel

In [4]:
data_filepath = Path('../../data') / 'generated' / 'lotka_volterra_gaussian_noise.csv'
data = pd.read_csv(data_filepath, index_col=0)
t = data.index.values
y = data[['u1', 'u2']].values

In [5]:
def lotka_volterra_sensitivity(t, uw, theta):
    theta1, theta2, theta3, theta4 = theta
    u1, u2, w1, w2, w3, w4, w5, w6, w7, w8 = uw
    return [
        # model equations
        theta1 * u1 - theta2 * u1 * u2,
        theta4 * u1 * u2 - theta3 * u2,
        # sensitivities
        u1 + (theta1 - theta2 * u2) * w1 - theta2 * u1 * w5,
        -u1 * u2 + (theta1 - theta2 * u2) * w2 - theta2 * u1 * w6,
        (theta1 - theta2 * u2) * w3 - theta2 * u1 * w7,
        (theta1 - theta2 * u2) * w4 - theta2 * u1 * w8,
        theta4 * u2 * w1 + (theta4 * u1 - theta3) * w5,
        theta4 * u2 * w2 + (theta4 * u1 - theta3) * w6,
        -u2 + theta4 * u2 * w3 + (theta4 * u1 - theta3) * w7,
        u1 * u2 + theta4 * u2 * w4 + (theta4 * u1 - theta3) * w8,
    ]

In [6]:
t_span = [0, 25]  # the time span over which to integrate the system
q = 2  # number of state variables
d = 4  # dimension of the parameter space
u_init = [1., 1.]  # initial values
uw_init = np.concatenate([np.array(u_init), np.zeros(d * q)])

In [7]:
means = [0, 0]
C = np.diag([0.2 ** 2, 0.2 ** 2])

In [8]:
def grad_log_likelihood(theta):
    """Solve the system of ODEs and calculate the log-likelihood"""
    sol = solve_ivp(lotka_volterra_sensitivity, t_span, uw_init, args=(theta,), dense_output=True)
    sensitivity_forward = sol.sol(t).T
    J = sensitivity_forward[:, q:].reshape(len(t), -1, q, order='F')
    grad_log_phi = (inv(C) @ (y - sensitivity_forward[:, :q]).T).T[:, :, np.newaxis]
    return np.sum(np.squeeze(J @ grad_log_phi), axis=0)

In [9]:
def grad_log_posterior(theta):
    return grad_log_likelihood(theta) - theta

Load the previously calculated samples:

In [10]:
filepath = Path('../../data') / 'generated' / f'rw_chain_0_seed_12345.csv'
rw_sample = np.genfromtxt(filepath, delimiter=',')

### Sequential calculation of gradients

In [11]:
unique_samples, inverse_index = np.unique(np.exp(rw_sample), axis=0, return_inverse=True)
unique_samples.shape

(113143, 4)

In [12]:
%%time
grad_sequential = np.apply_along_axis(grad_log_posterior, 1, unique_samples)

CPU times: user 23min 32s, sys: 13min 31s, total: 37min 3s
Wall time: 18min 32s


### Parallel calculation locally

In [13]:
client_local = Client(processes=True, threads_per_worker=4, n_workers=4, memory_limit='2GB')
client_local

Perhaps you already have a cluster running?
Hosting the HTTP server on port 35397 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:35397/status,

0,1
Dashboard: http://127.0.0.1:35397/status,Workers: 4
Total threads: 16,Total memory: 7.45 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:45495,Workers: 4
Dashboard: http://127.0.0.1:35397/status,Total threads: 16
Started: Just now,Total memory: 7.45 GiB

0,1
Comm: tcp://127.0.0.1:40957,Total threads: 4
Dashboard: http://127.0.0.1:46023/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:35135,
Local directory: /tmp/dask-scratch-space/worker-u6jocv4x,Local directory: /tmp/dask-scratch-space/worker-u6jocv4x

0,1
Comm: tcp://127.0.0.1:45355,Total threads: 4
Dashboard: http://127.0.0.1:36865/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:36843,
Local directory: /tmp/dask-scratch-space/worker-9rri20z5,Local directory: /tmp/dask-scratch-space/worker-9rri20z5

0,1
Comm: tcp://127.0.0.1:41873,Total threads: 4
Dashboard: http://127.0.0.1:35607/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:34113,
Local directory: /tmp/dask-scratch-space/worker-adyqgnb4,Local directory: /tmp/dask-scratch-space/worker-adyqgnb4

0,1
Comm: tcp://127.0.0.1:36021,Total threads: 4
Dashboard: http://127.0.0.1:43279/status,Memory: 1.86 GiB
Nanny: tcp://127.0.0.1:35309,
Local directory: /tmp/dask-scratch-space/worker-2ykv9kt8,Local directory: /tmp/dask-scratch-space/worker-2ykv9kt8


  self._gilknocker.reset_contention_metric()


In [14]:
%%time
grad_parallel_local = apply_along_axis_parallel(grad_log_posterior, 1, unique_samples, 200, client_local)

CPU times: user 22.9 s, sys: 2.14 s, total: 25 s
Wall time: 5min 52s


### Parallel calculation on AWS

In [15]:
cluster = coiled.Cluster(
    n_workers=10,
    region="us-east-1",
)
client_aws = cluster.get_client()
client_aws

Output()

Output()


+---------+--------+-----------+---------+
| Package | Client | Scheduler | Workers |
+---------+--------+-----------+---------+
| lz4     | None   | 4.3.3     | 4.3.3   |
+---------+--------+-----------+---------+


0,1
Connection method: Cluster object,Cluster type: coiled.Cluster
Dashboard: https://cluster-fzpok.dask.host/gg3Bbm3mDtF6evRp/status,

0,1
Dashboard: https://cluster-fzpok.dask.host/gg3Bbm3mDtF6evRp/status,Workers: 2
Total threads: 8,Total memory: 29.68 GiB

0,1
Comm: tls://10.0.173.198:8786,Workers: 2
Dashboard: http://10.0.173.198:8787/status,Total threads: 8
Started: Just now,Total memory: 29.68 GiB

0,1
Comm: tls://10.0.165.40:38133,Total threads: 4
Dashboard: http://10.0.165.40:8787/status,Memory: 14.84 GiB
Nanny: tls://10.0.165.40:43451,
Local directory: /scratch/dask-scratch-space/worker-v0ovqqi9,Local directory: /scratch/dask-scratch-space/worker-v0ovqqi9

0,1
Comm: tls://10.0.166.36:43363,Total threads: 4
Dashboard: http://10.0.166.36:8787/status,Memory: 14.84 GiB
Nanny: tls://10.0.166.36:40967,
Local directory: /scratch/dask-scratch-space/worker-w72mappl,Local directory: /scratch/dask-scratch-space/worker-w72mappl


2024-06-19 12:17:24,353 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client


In [16]:
%%time
grad_parallel_aws = apply_along_axis_parallel(
    grad_log_posterior, 1, unique_samples, unique_samples.shape[0] // len(cluster.details()['workers']), client_aws
)

CPU times: user 9.39 s, sys: 2.84 s, total: 12.2 s
Wall time: 2min 49s
