# Lesson 4: Vertical and horizontal scaling (workbook)

In [None]:
# Python standard library
import ctypes
import time
from subprocess import Popen, PIPE

# Scientific Python ecosystem
import numpy as np
import numba as nb
import dask
import dask.array as da
import dask.distributed
import h5py

# Particle physics tools
import awkward as ak
import uproot
import dask_awkward as dak
from hist import Hist

## Performance limitations of array-oriented code

In [None]:
events = ak.from_parquet("data/SMHiggsToZZTo4L.parquet")[:100000]

In [None]:
%%timeit -r1 -n1

pz = [[muon.pt * np.sinh(muon.eta) for muon in event.muon] for event in events]

In [None]:
%%timeit -r1 -n1

pz = events.muon.pt * np.sinh(events.muon.eta)

## Speed of compiled code

In [None]:
%%writefile quadratic_formula_c.c

#include <math.h>

void run(double* a, double* b, double* c, double* output) {
    for (int i = 0;  i < 1000000;  i++) {
        output[i] = (-b[i] + sqrt(b[i]*b[i] - 4*a[i]*c[i])) / (2*a[i]);
    }
}

In [None]:
! cc quadratic_formula_c.c -O3 -shared -lm -o quadratic_formula_c.so

In [None]:
pointer_double = ctypes.POINTER(ctypes.c_double)

quadratic_formula_c = ctypes.CDLL("./quadratic_formula_c.so")
quadratic_formula_c.run.argtypes = (pointer_double, pointer_double, pointer_double, pointer_double)
quadratic_formula_c.run.restype = None

def ptr(array):
    return array.ctypes.data_as(pointer_double)

In [None]:
a = np.random.uniform(5, 10, 1000000)
b = np.random.uniform(10, 20, 1000000)
c = np.random.uniform(-0.1, 0.1, 1000000)

In [None]:
%%timeit

output = np.empty(1000000)
quadratic_formula_c.run(ptr(a), ptr(b), ptr(c), ptr(output))

In [None]:
%%timeit

output = (-b + np.sqrt(b**2 - 4*a*c)) / (2*a)

In [None]:
%%timeit

tmp1 = np.negative(b)            # -b
tmp2 = np.square(b)              # b**2
tmp3 = np.multiply(4, a)         # 4*a
tmp4 = np.multiply(tmp3, c)      # tmp3*c
del tmp3
tmp5 = np.subtract(tmp2, tmp4)   # tmp2 - tmp4
del tmp2, tmp4
tmp6 = np.sqrt(tmp5)             # sqrt(tmp5)
del tmp5
tmp7 = np.add(tmp1, tmp6)        # tmp1 + tmp6
del tmp1, tmp6
tmp8 = np.multiply(2, a)         # 2*a
np.divide(tmp7, tmp8)            # tmp7 / tmp8     This is the result!

In [None]:
%%timeit -r1 -n1

output = np.empty(1000000)
for i in range(len(output)):
    output[i] = (-b[i] + np.sqrt(b[i]**2 - 4*a[i]*c[i])) / (2*a[i])

### pybind11

In [None]:
%%writefile quadratic_formula_pybind11.cpp

#include <pybind11/pybind11.h>
#include <pybind11/numpy.h>

namespace py = pybind11;

void run(
    py::array_t<double, py::array::forcecast> a_numpy,
    py::array_t<double, py::array::forcecast> b_numpy,
    py::array_t<double, py::array::forcecast> c_numpy,
    py::array_t<double> output_numpy
) {
    const double* a = a_numpy.data();
    const double* b = b_numpy.data();
    const double* c = c_numpy.data();
    double* output = output_numpy.mutable_data();
    for (int i = 0;  i < output_numpy.size();  i++) {
        output[i] = (-b[i] + sqrt(b[i]*b[i] - 4*a[i]*c[i])) / (2*a[i]);
    }
}

PYBIND11_MODULE(quadratic_formula_pybind11, m) {
    m.def("run", &run);
}

In [None]:
import os
import sys
from pybind11 import get_include

inc = "-I " + get_include()
plat = "-undefined dynamic_lookup" if "darwin" in sys.platform else "-fPIC"
pyinc = !python3-config --cflags

In [None]:
! c++ -std=c++11 quadratic_formula_pybind11.cpp -O3 -shared {inc} {pyinc.s} -o quadratic_formula_pybind11.so {plat}

In [None]:
import quadratic_formula_pybind11

In [None]:
output = np.zeros(1000000)
quadratic_formula_pybind11.run(a, b, c, output)
output

In [None]:
%%timeit

output = np.zeros(1000000)
quadratic_formula_pybind11.run(a, b, c, output)

**Important!** As usual with array-oriented programming, the essential thing is that the loop over big datasets is in the compiled code. What do you think will happen in the following?

In [None]:
%%writefile quadratic_formula_pybind11_noloop.cpp

#include <pybind11/pybind11.h>
namespace py = pybind11;

double run(double a, double b, double c) {
    return (-b + sqrt(b*b - 4*a*c)) / (2*a);
}
PYBIND11_MODULE(quadratic_formula_pybind11_noloop, m) {
    m.def("run", &run);
}

In [None]:
! c++ -std=c++11 quadratic_formula_pybind11_noloop.cpp -O3 -shared {inc} {pyinc.s} -o quadratic_formula_pybind11_noloop.so {plat}

In [None]:
import quadratic_formula_pybind11_noloop

In [None]:
%%timeit -r1 -n1

output = np.zeros(1000000)
for i in range(len(output)):
    output[i] = quadratic_formula_pybind11_noloop.run(a[i], b[i], c[i])

### Numba

In [None]:
@nb.jit
def quadratic_formula_numba(a_array, b_array, c_array):
    output = np.empty(len(a_array))
    for i, (a, b, c) in enumerate(zip(a_array, b_array, c_array)):
        output[i] = (-b + np.sqrt(b**2 - 4*a*c)) / (2*a)
    return output

In [None]:
quadratic_formula_numba(a, b, c)

In [None]:
%%timeit

quadratic_formula_numba(a, b, c)

In [None]:
@nb.jit
def f():
    x = "a string"
    for i in range(5):
        if i == 0:
            x = 0
        x = x + 1
    return x

f()

In [None]:
@nb.jit
def quadratic_formula_numba_on_arrays(a, b, c):
    tmp1 = np.negative(b)            # -b
    tmp2 = np.square(b)              # b**2
    tmp3 = np.multiply(4, a)         # 4*a
    tmp4 = np.multiply(tmp3, c)      # tmp3*c
    tmp5 = np.subtract(tmp2, tmp4)   # tmp2 - tmp4
    tmp6 = np.sqrt(tmp5)             # sqrt(tmp5)
    tmp7 = np.add(tmp1, tmp6)        # tmp1 + tmp6
    tmp8 = np.multiply(2, a)         # 2*a
    return np.divide(tmp7, tmp8)     # tmp7 / tmp8

quadratic_formula_numba_on_arrays(a, b, c)

In [None]:
%%timeit

quadratic_formula_numba_on_arrays(a, b, c)

### Julia

### JAX

In [None]:
import jax
jax.config.update("jax_platform_name", "cpu")

In [None]:
@jax.jit
def quadratic_formula_jax(a, b, c):
    return (-b + jax.numpy.sqrt(b**2 - 4*a*c)) / (2*a)

In [None]:
quadratic_formula_jax(a, b, c)

In [None]:
%%timeit

quadratic_formula_jax(a, b, c)

In [None]:
@jax.jit
def quadratic_formula_jax_on_arrays(a, b, c):
    tmp1 = jax.numpy.negative(b)            # -b
    tmp2 = jax.numpy.square(b)              # b**2
    tmp3 = jax.numpy.multiply(4, a)         # 4*a
    tmp4 = jax.numpy.multiply(tmp3, c)      # tmp3*c
    tmp5 = jax.numpy.subtract(tmp2, tmp4)   # tmp2 - tmp4
    tmp6 = jax.numpy.sqrt(tmp5)             # sqrt(tmp5)
    tmp7 = jax.numpy.add(tmp1, tmp6)        # tmp1 + tmp6
    tmp8 = jax.numpy.multiply(2, a)         # 2*a
    return jax.numpy.divide(tmp7, tmp8)     # tmp7 / tmp8

quadratic_formula_jax_on_arrays(a, b, c)

In [None]:
%%timeit

quadratic_formula_jax_on_arrays(a, b, c)

### Dask

In [None]:
@dask.delayed
def increment(i):
    return i + 1

@dask.delayed
def add(a, b):
    return a + b

a, b = 1, 12
c = increment(a)
d = increment(b)
output = add(c, d)

output

In [None]:
output.compute()

In [None]:
output.visualize()

In [None]:
a = da.random.uniform(5, 10, 1000000)
b = da.random.uniform(10, 20, 1000000)
c = da.random.uniform(-0.1, 0.1, 1000000)

output = (-b + np.sqrt(b**2 - 4*a*c)) / (2*a)
output

In [None]:
output.visualize()

In [None]:
output.visualize(optimize_graph=True)

In [None]:
@dask.delayed
def start():
    print("start")
    return 1

@dask.delayed
def concurrent(initial, i):
    print(f"begin {i}")
    time.sleep(np.random.uniform(0, 5))
    print(f"end {i}")
    return initial + i**2

@dask.delayed
def combine(partial_results):
    print("combine")
    return sum(partial_results)

initial = start()
output = combine([concurrent(initial, i) for i in range(10)])

In [None]:
output.visualize()

In [None]:
output.compute()

May be `"synchronous"`, `"threads"`, or `"processes"`.

In [None]:
with dask.config.set(scheduler="synchronous"):
    output.compute()

In [None]:
a = da.random.uniform(5, 10, 1000000, chunks=200000)
b = da.random.uniform(10, 20, 1000000, chunks=200000)
c = da.random.uniform(-0.1, 0.1, 1000000, chunks=200000)

output_array = (-b + np.sqrt(b**2 - 4*a*c)) / (2*a)
output_array

In [None]:
output_array.visualize(optimize_graph=True)

In [None]:
output_array.compute()

### Dask clusters

In [None]:
scheduler = Popen(["dask-scheduler"], stdout=PIPE, stderr=PIPE, text=True)
time.sleep(0.5)

worker1 = Popen(["dask", "worker", "--nthreads", "1", "127.0.0.1:8786"], stdout=PIPE, stderr=PIPE, text=True)
worker2 = Popen(["dask", "worker", "--nthreads", "1", "127.0.0.1:8786"], stdout=PIPE, stderr=PIPE, text=True)
worker3 = Popen(["dask", "worker", "--nthreads", "1", "127.0.0.1:8786"], stdout=PIPE, stderr=PIPE, text=True)
time.sleep(0.5)

In [None]:
client = dask.distributed.Client("127.0.0.1:8786")
client

2025-01-08 18:31:15,129 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client


In [None]:
output.compute()

In [None]:
time.sleep(0.5)
worker1.terminate()
worker2.terminate()
worker3.terminate()

time.sleep(0.5)
scheduler.terminate()

In [None]:
stdout, stderr = scheduler.communicate()

stdout1, stderr1 = worker1.communicate()
stdout2, stderr2 = worker2.communicate()
stdout3, stderr3 = worker3.communicate()

### Dask collections

In [None]:
dataset_hdf5 = h5py.File("data/SMHiggsToZZTo4L.h5")

pt1 = da.from_array(dataset_hdf5["ee_mumu"]["e1"]["pt"], chunks=10000)
phi1 = da.from_array(dataset_hdf5["ee_mumu"]["e1"]["phi"], chunks=10000)
eta1 = da.from_array(dataset_hdf5["ee_mumu"]["e1"]["eta"], chunks=10000)
pt2 = da.from_array(dataset_hdf5["ee_mumu"]["e2"]["pt"], chunks=10000)
phi2 = da.from_array(dataset_hdf5["ee_mumu"]["e2"]["phi"], chunks=10000)
eta2 = da.from_array(dataset_hdf5["ee_mumu"]["e2"]["eta"], chunks=10000)

In [None]:
mass = np.sqrt(2*pt1*pt2*(np.cosh(eta1 - eta2) - np.cos(phi1 - phi2)))
mass

In [None]:
with dask.config.set(scheduler="threads"):
    Hist.new.Reg(120, 0, 120, name="dimuon mass").Double().fill(
        mass.compute()
    ).plot();

In [None]:
events = uproot.dask("data/SMHiggsToZZTo4L.root")
events

In [None]:
selected = events[ak.num(events.Electron_pt) == 2]

pt1 = selected.Electron_pt[:, 0]
phi1 = selected.Electron_phi[:, 0]
eta1 = selected.Electron_eta[:, 0]
pt2 = selected.Electron_pt[:, 1]
phi2 = selected.Electron_phi[:, 1]
eta2 = selected.Electron_eta[:, 1]

mass = np.sqrt(2*pt1*pt2*(np.cosh(eta1 - eta2) - np.cos(phi1 - phi2)))
mass

In [None]:
with dask.config.set(scheduler="threads"):
    Hist.new.Reg(120, 0, 120, name="dimuon mass").Double().fill(
        mass.compute()
    ).plot();