# Paralelní počítání v Cythonu (OpenMP)
Minule jsme načnuly Cython a říkali si, že umožňuje vyžívat více vláken na procesorech s více jádry pro paralelní výpočty. Dnes se na to podíváme podrobněji.

Konkrétně se podíváme na dva způsoby paralelizace:
- určení separátní práce pro každé vlákno
    - pomocí `with nogil, parallel():`
- paralelizace for cyklu
    - pomocí `prange()`


## Paralelizace pomocí `with nogil, parallel():`
Jedná so o Cython implementaci C OpenMP direktivy `#pragma omp parallel`. Vlákna jsou vytvořena až v okamžiku, kdy se dostaneme do bloku `with nogil, parallel():`. Vlákna jsou zničena na konci bloku. Uvnitř bloku pracuje každé vlákno separátně, a lze použít OpenMP knihovny pro identifikaci vláken: `omp_get_thread_num()` a `omp_get_num_threads()`.

Proměnné jsou automaticky sdílené, pokud do nějaké proměnné uložíme nový objekt (přepíšeme ji) uvnitř paralelního bloku (tedy každé vlákno bude mít svou verzi dat pro dannou proměnnou) stane se z proměnné privátní proměnná (a její obsah nebude mít po skončení paralelního bloku smysl).

## Paralelizace pomocí `prange()`
Vnitřní funkcionalita je v jádru identická jako předchozí varianta (např. co se týče privátních proměnných).

Co se v každém threadu děje se řídí "automaticky", dle původního for cyklu.

Syntaxe `prange()` je v základu identická jako u `range()`, ale je nutné ji importovat z knihovny `cython.parallel`. Má však další argumenty, např.:
- `schedule`: statické nebo dynamické rozdělení práce mezi vlákna
- `num_threads`: počet vláken
- `chunksize`: velikost chunku (počet iterací), který se přidělí jednomu vláknu
- ` nogil`: zda se má použít GIL (Global Interpreter Lock) nebo ne
- ...

Cython umožňuje použití redukce do proměnné (alternativa OpenMP `reduction` direktivy). Toto se v Cythonu definuje pomocí `+=` direktivy (případně ostatních operátorů `-=`, `*=`, ...). Je třeba tyto operátory nepoužívat v paralelní části Cythonu, pokud skutečně nechceme provést redukci.

In [None]:
# pro jednoduchost použijeme přímo magic commandy
%load_ext cython

## Počítání normy vektoru

#### Původní kód z minula

In [None]:
%%cython --compile-args=-O3
import numpy as np
cimport numpy as np
from libc.math cimport sqrt
import cython # je zde kvůli dekorátorům

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef double my_norm_serial(np.ndarray[np.float64_t, ndim=1] a):
    cdef int i
    cdef int n = a.shape[0]
    cdef double result = 0.0
    for i in range(n):
        result += a[i] * a[i]
    return sqrt(result)

In [None]:
import numpy as np
x = np.random.rand(int(1e7*8))
y1 = my_norm_serial(x)
y2 = np.linalg.norm(x)
print(y1,y2)

In [None]:
%timeit _ = np.linalg.norm(x)

In [None]:
%timeit _ = my_norm_serial(x)

#### Cython s paralelizací pomocí `with nogil, parallel():`

In [None]:
%%cython --compile-args=-fopenmp --compile-args=-O3 --link-args=-fopenmp

import numpy as np
cimport numpy as np
from libc.math cimport sqrt
import cython

from cython.parallel import parallel # paralelní část kódu
from openmp cimport omp_get_thread_num # zjištění čísla threadu z OpenMP

ctypedef np.float64_t DTYPE_t

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef double my_norm_parallel_1(np.ndarray[np.float64_t, ndim=1] a, int num_threads):
    cdef int i
    cdef int n = a.shape[0]
    cdef double result = 0.0

    cdef int chunk_size = n // num_threads # jakou část pole řeší každý thread
    cdef int thread_num # číslo threadu
    # pole pro výsledky z jednotlivých threadů
    cdef np.ndarray[np.float64_t, ndim=1] partial_results = np.zeros((num_threads), dtype=np.float64) 

    with nogil, parallel(num_threads=num_threads): # paralelní část kódu, v tomto bloku je každý thread samostatný
        thread_num = omp_get_thread_num() # zjistíme číslo threadu
        result = 0 # výsledek pro daný thread, tímto se z results stane privátní proměnná
        # cyklus přes část pole, kterou řeší daný thread
        for i in range(thread_num * chunk_size, (thread_num + 1) * chunk_size):
            result = result + a[i] * a[i] # pozor na a += b vs a = a + b
        partial_results[thread_num] = result

    # sečteme výsledky z jednotlivých threadů
    result = 0
    for i in range(num_threads):
        result += partial_results[i]

    return sqrt(result)

In [None]:
x = np.random.rand(int(1e7*8))
y1 = my_norm_parallel_1(x, 4)
y2 = np.linalg.norm(x)
print(y1, y2)

In [None]:
%timeit _ = my_norm_parallel_1(x,16)

In [None]:
%timeit _ = np.linalg.norm(x)

#### Cython s paralelizací pomocí `prange()`

In [None]:
%%cython --compile-args=-fopenmp --compile-args=-O3 --link-args=-fopenmp

import numpy as np
cimport numpy as np
from libc.math cimport sqrt
import cython
from cython.parallel import prange # stačí nám paralelní alternativa k range

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef double my_norm_parallel_2(np.ndarray[np.float64_t, ndim=1] a, int num_threads):
    cdef int i
    cdef int n = a.shape[0]
    cdef double result = 0.0
    for i in prange(n, nogil=True, num_threads=num_threads): # paralelní cyklus
        result += a[i] * a[i] # += definuje redukci!!! 
    return sqrt(result)

In [None]:
x = np.random.rand(int(1e7*8))
y1 = my_norm_parallel_2(x, 4)
y2 = np.linalg.norm(x)
print(y1, y2)

In [None]:
%timeit _ = my_norm_parallel_2(x, 16)

In [None]:
%timeit _ = np.linalg.norm(x)

### Srovnání rychlosti

In [None]:
import time
import matplotlib.pyplot as plt


# pro lepší měření času, budeme měřit čas pro více opakování a vybrat minimum
def measure_multi(n,func, data):
    tmp_time = []
    for i in range(n):
        start = time.time()
        _ = func(data)
        tmp_time.append(time.time() - start)
    return min(tmp_time)

n_loops = 10

x = np.random.rand(int(1e7*8))

pocet_vlaken =[2**i for i in range(11)]

time_numpy = measure_multi(n_loops, lambda data : np.linalg.norm(data), x)

time_paralell_1 = []
time_paralell_2 = []


for n in pocet_vlaken:

    time_paralell_1.append(measure_multi(n_loops, lambda data : my_norm_parallel_1(data, n), x))
    time_paralell_2.append(measure_multi(n_loops, lambda data : my_norm_parallel_2(data, n), x))

    print(f"n_threads: {n}, time: {time_paralell_1[-1]}, time: {time_paralell_2[-1]}")

# log-log grafy
plt.loglog(pocet_vlaken, [time_numpy for _ in pocet_vlaken], label="numpy")
plt.loglog(pocet_vlaken, time_paralell_1, label="parallel 1")
plt.loglog(pocet_vlaken, time_paralell_2, label="parallel 2")

plt.legend()

# Jak paralelizmus v Cythonu zrchlí počítání vzdálenosti mezi body z minula?

#### Verze z minula

In [None]:
%%cython --compile-args=-O3

import numpy as np
cimport numpy as np
from libc.math cimport sqrt
cimport cython

ctypedef np.float64_t DTYPE_t

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef dist_cython_serial_1(np.ndarray[DTYPE_t, ndim=2] points):
    cdef int n = points.shape[0]
    cdef np.ndarray[DTYPE_t, ndim=2] distances_all = np.zeros((n, n), dtype=np.float64)
    cdef int i, j, k
    cdef double tmp_sum, rozdil, tmp_dist

    for i in range(n):
        for j in range(i+1, n):
            tmp_sum = 0
            for k in range(3):
                rozdil = points[i, k] - points[j, k]
                tmp_sum += rozdil * rozdil
            tmp_dist = sqrt(tmp_sum)
            distances_all[i, j] = tmp_dist
            distances_all[j, i] = tmp_dist

    
    return distances_all


Verze s redundatním výpočtem vzdálenosti mezi body, bude se lépe paralelizovat.

In [None]:
%%cython --compile-args=-O3

import numpy as np
cimport numpy as np
from libc.math cimport sqrt
cimport cython

ctypedef np.float64_t DTYPE_t

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef dist_cython_serial_2(np.ndarray[DTYPE_t, ndim=2] points):
    cdef int n = points.shape[0]
    cdef np.ndarray[DTYPE_t, ndim=2] distances_all = np.zeros((n, n), dtype=np.float64)
    cdef int i, j, k
    cdef double tmp_sum, rozdil, tmp_dist

    for i in range(n):
        for j in range(n):
            tmp_sum = 0
            for k in range(3):
                rozdil = points[i, k] - points[j, k]
                tmp_sum += rozdil * rozdil
            tmp_dist = sqrt(tmp_sum)
            distances_all[i, j] = tmp_dist

    
    return distances_all


#### Cython s paralelizací pomocí `prange()`

In [None]:
%%cython --compile-args=-fopenmp --compile-args=-O3 --link-args=-fopenmp

import numpy as np
cimport numpy as np
from libc.math cimport sqrt
cimport cython
from cython.parallel import parallel, prange # toto je zde nové

ctypedef np.float64_t DTYPE_t

@cython.cdivision(True)
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef dist_cython_paralell(np.ndarray[DTYPE_t, ndim=2] points, int n_threads):
    cdef int n = points.shape[0]
    cdef np.ndarray[DTYPE_t, ndim=2] distances_all = np.zeros((n, n), dtype=np.float64)
    cdef int i, j, k
    cdef double tmp_sum, rozdil, tmp_dist


    for i in prange(n, nogil=True, num_threads=n_threads): # toto je zde nové
        for j in range(n):
            tmp_sum = 0  # Reset tmp_sum for each new pair of points
            for k in range(3):
                rozdil = points[i, k] - points[j, k]
                tmp_sum = tmp_sum + rozdil * rozdil # toto je zde nové
            tmp_dist = sqrt(tmp_sum)
            distances_all[i, j] = tmp_dist

    return distances_all




In [None]:
import numpy as np

# vyzkoušíme si, jak to funguje
points_np = np.random.rand(2**13, 3)

In [None]:
%timeit _ = dist_cython_serial_1(points_np)


In [None]:
%timeit _ = dist_cython_serial_2(points_np)

In [None]:
%timeit _ = dist_cython_paralell(points_np,32)

In [None]:
import time
import matplotlib.pyplot as plt

pocet_vlaken =[2**i for i in range(10, 14)]
time_cython_serial_1 = []
time_cython_serial_2 = []

all_num_threads = [2, 4, 8, 16, 32, 64, 128]
time_cython_parallel = [[] for _ in all_num_threads]



n_loops = 10

def measure_multi(n,func):
    tmp_time = []
    for i in range(n):
        start = time.time()
        _ = func(points_np)
        tmp_time.append(time.time() - start)
    return min(tmp_time)

for n in pocet_vlaken:
    points_np = np.random.rand(n, 3)
    
    time_cython_serial_1.append(measure_multi(n_loops, dist_cython_serial_1))
    time_cython_serial_2.append(measure_multi(n_loops, dist_cython_serial_2))

    for idx, n_threads in enumerate(all_num_threads):
        time_cython_parallel[idx].append(measure_multi(n_loops, lambda points_np : dist_cython_paralell(points_np, n_threads)))


    min_time = min([time_cython_parallel[idx][-1] for idx, _ in enumerate(all_num_threads)])
    ratio = time_cython_serial_2[-1]/min_time
    print(f"velikost {n}, čas sekvenční {time_cython_serial_2[-1]}, nejlepší paralelní {min_time}, poměr {ratio}")

# log-log grafy
plt.loglog(pocet_vlaken, time_cython_serial_1, label='serial 1')
plt.loglog(pocet_vlaken, time_cython_serial_2, label='serial 2')
for idx, n_threads in enumerate(all_num_threads):
    plt.loglog(pocet_vlaken, time_cython_parallel[idx], label='paralell' + str(n_threads))

plt.legend()

## Proč je verze serial_1 tak pomalá?

Důvodem je ukládání do pole, které pří indexech `[i, j] a [j, i]` zaručeně v jednom případě přistupuje do paměti v pořadí, které není sekvenční. To způsobuje, že se využívá cache paměti méně efektivně.

Vyhodíme z výpočtu zápis na index `[j, i]`, tedy výstup bude horní trojúhelníková matice.

In [None]:
%%cython --compile-args=-O3

import numpy as np
cimport numpy as np
from libc.math cimport sqrt
cimport cython

ctypedef np.float64_t DTYPE_t

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef dist_cython_serial_1(np.ndarray[DTYPE_t, ndim=2] points):
    cdef int n = points.shape[0]
    cdef np.ndarray[DTYPE_t, ndim=2] distances_all = np.zeros((n, n), dtype=np.float64)
    cdef int i, j, k
    cdef double tmp_sum, rozdil, tmp_dist

    for i in range(n):
        for j in range(i+1, n):
            tmp_sum = 0
            for k in range(3):
                rozdil = points[i, k] - points[j, k]
                tmp_sum += rozdil * rozdil
            tmp_dist = sqrt(tmp_sum)
            distances_all[i, j] = tmp_dist

    
    return distances_all


Ozkoušíme benchmarking znova:

In [None]:
import time
import matplotlib.pyplot as plt

pocet_vlaken =[2**i for i in range(10, 14)]
time_cython_serial_1 = []
time_cython_serial_2 = []

all_num_threads = [2, 4, 8, 16, 32, 64, 128]
time_cython_parallel = [[] for _ in all_num_threads]



n_loops = 10

def measure_multi(n,func):
    tmp_time = []
    for i in range(n):
        start = time.time()
        _ = func(points_np)
        tmp_time.append(time.time() - start)
    return min(tmp_time)

for n in pocet_vlaken:
    points_np = np.random.rand(n, 3)
    
    time_cython_serial_1.append(measure_multi(n_loops, dist_cython_serial_1))
    time_cython_serial_2.append(measure_multi(n_loops, dist_cython_serial_2))

    for idx, n_threads in enumerate(all_num_threads):
        time_cython_parallel[idx].append(measure_multi(n_loops, lambda points_np : dist_cython_paralell(points_np, n_threads)))


    min_time = min([time_cython_parallel[idx][-1] for idx, _ in enumerate(all_num_threads)])
    ratio = time_cython_serial_2[-1]/min_time
    print(f"velikost {n}, čas sekvenční {time_cython_serial_2[-1]}, nejlepší paralelní {min_time}, poměr {ratio}")

# log-log grafy
plt.loglog(pocet_vlaken, time_cython_serial_1, label='serial 1')
plt.loglog(pocet_vlaken, time_cython_serial_2, label='serial 2')
for idx, n_threads in enumerate(all_num_threads):
    plt.loglog(pocet_vlaken, time_cython_parallel[idx], label='paralell' + str(n_threads))

plt.legend()