# Multithreading (Procesos multiples)

Ahora finalmente estamos listos para comenzar a hablar sobre ejecutar cosas en múltiples procesadores! La mayoría de las computadoras (incluso los teléfonos celulares) en estos días tienen múltiples núcleos o procesadores, por lo que el lugar obvio para comenzar a trabajar con el paralelismo es haciendo uso de aquellos dentro de nuestro proceso de Julia.

Sin embargo, el primer desafío es saber con precisión cuántos "procesadores" tiene. "Procesadores" está entre comillas porque, bueno, es complicado.

In [1]:
versioninfo(verbose = true)

Julia Version 1.8.2
Commit 36034abf260 (2022-09-29 15:21 UTC)
Platform Info:
  OS: Linux (x86_64-linux-gnu)
      Ubuntu 22.04.1 LTS
  uname: Linux 5.15.0-56-generic #62-Ubuntu SMP Tue Nov 22 19:54:14 UTC 2022 x86_64 x86_64
  CPU: Intel(R) Core(TM) i7-4790 CPU @ 3.60GHz: 
              speed         user         nice          sys         idle          irq
       #1  3500 MHz      64066 s         50 s       2738 s    2350732 s          0 s
       #2   800 MHz      67002 s         51 s       2532 s    2350991 s          0 s
       #3   800 MHz      57391 s         55 s       2036 s    2351542 s          0 s
       #4   800 MHz      65958 s         62 s       1709 s    2352057 s          0 s
       #5   800 MHz      14635 s         26 s       1504 s    2349683 s          0 s
       #6   800 MHz      36363 s         66 s       2696 s    2350783 s          0 s
       #7   800 MHz      53339 s         28 s       2149 s    2351035 s          0 s
       #8  3598 MHz      70726 s         28 s  

In [2]:
]add Hwloc

[32m[1m    Updating[22m[39m registry at `~/.julia/registries/General`
[32m[1m    Updating[22m[39m git-repo `https://github.com/JuliaRegistries/General.git`
[32m[1m   Resolving[22m[39m package versions...
[32m[1m  No Changes[22m[39m to `~/.julia/environments/v1.8/Project.toml`
[32m[1m  No Changes[22m[39m to `~/.julia/environments/v1.8/Manifest.toml`
[32m[1mPrecompiling[22m[39m project...
[32m  ✓ [39m[90mPlots[39m
[32m  ✓ [39m[90mMakie[39m
[91m  ✗ [39mGLMakie
[32m  ✓ [39mCairoMakie
  3 dependencies successfully precompiled in 165 seconds. 278 already precompiled.
  [91m1[39m dependency errored. To see a full report either run `import Pkg; Pkg.precompile()` or load the package


In [3]:
#; cat /proc/cpuinfo # En máquinas con S.O. Linux

using Hwloc
Hwloc.num_physical_cores()

4

Lo que su computadora informa como la cantidad de procesadores podría no ser lo mismo que la cantidad total de "núcleos". Si bien a veces los procesadores virtuales pueden agregar rendimiento, la paralelización de un cálculo numérico típico sobre estos procesadores virtuales conducirá a un rendimiento significativamente peor porque todavía tienen que compartir gran parte de los aspectos básicos del hardware de cálculo.

¡Julia es algo multiproceso por defecto! Las llamadas BLAS(*Basic Linear Algebra Subroutines*),como la multiplicación de matrices por ejemplo, ya estan separadas en hilos de procesoo:

In [4]:
using BenchmarkTools
A = rand(2000, 2000);
B = rand(2000, 2000);
@btime $A*$B;

  89.897 ms (2 allocations: 30.52 MiB)


¡Esto, de forma predeterminada, ya está usando todos sus núcleos de CPU! Puede ver el efecto cambiando la cantidad de subprocesos (que BLAS admite hacer dinámicamente):

In [5]:
using LinearAlgebra
BLAS.set_num_threads(1)
@btime $A*$B
BLAS.set_num_threads(4)
@btime $A*$B

  317.830 ms (2 allocations: 30.52 MiB)
  90.866 ms (2 allocations: 30.52 MiB)


2000×2000 Matrix{Float64}:
 501.557  484.606  494.366  495.617  …  490.99   500.913  510.12   494.428
 487.117  464.659  475.428  474.077     472.102  478.0    489.568  476.796
 506.227  503.987  496.952  498.592     492.6    504.238  507.118  499.737
 503.542  496.378  502.982  494.664     497.747  504.257  517.105  507.192
 489.566  479.515  484.258  477.918     493.773  485.311  498.965  497.485
 493.146  485.911  483.956  485.531  …  490.994  492.403  505.021  498.504
 504.33   485.607  499.676  490.759     490.906  506.104  509.324  499.617
 512.909  500.543  499.973  500.141     500.477  506.597  514.079  506.17
 502.064  484.026  487.35   490.932     487.082  501.39   503.959  495.767
 513.249  493.32   497.199  490.922     502.458  505.528  518.538  505.843
 502.424  494.862  500.097  507.131  …  498.721  507.527  514.339  501.787
 509.436  501.919  496.472  500.425     500.008  502.606  512.25   501.335
 502.661  485.339  492.692  481.625     488.716  496.748  498.676  508.045

## ¿Cómo luce la implementación de su propio algoritmo de subprocesos?

La compatibilidad con subprocesos múltiples está marcada como "experimental" para Julia 1.0 y está pendiente de una gran renovación para la versión 1.2 o 1.3 de Julia. Los principios básicos serán los mismos, pero debería ser mucho más fácil de usar de manera eficiente.

In [6]:
using .Threads

nthreads()

4


Actualmente, Julia necesita iniciarse sabiendo que tiene habilitada la compatibilidad con subprocesos.
Lo haces con una variable de entorno. Para obtener cuatro subprocesos, inicie Julia con:

JULIA_NUM_THREADS=4 julia

En JuliaBox, esto es un desafío: ¡no tenemos acceso al proceso de lanzamiento!

In [7]:
;env JULIA_NUM_THREADS=4 julia -E 'using .Threads; nthreads()'

4


In [8]:
threadid()

1

In [9]:
A = Array{Union{Int,Missing}}(missing, nthreads())
for i in 1:nthreads()
    A[threadid()] = threadid()
end
A

4-element Vector{Union{Missing, Int64}}:
 1
  missing
  missing
  missing

Pero si le ponemos el prefijo @threads, ¡entonces el cuerpo del ciclo se ejecuta en todos los hilos!

In [10]:
@threads for i in 1:nthreads()
    A[threadid()] = threadid()
end
A

4-element Vector{Union{Missing, Int64}}:
 1
 2
 3
 4

Entonces, intentemos implementar nuestro primer algoritmo sencillo de subprocesos, una suma:

In [11]:
function threaded_sum1(A)
    r = zero(eltype(A))
    @threads for i in eachindex(A)
        @inbounds r += A[i]
    end
    return r
end

A = rand(10_000_000)
threaded_sum1(A)
@time threaded_sum1(A)

  0.248360 seconds (20.00 M allocations: 305.178 MiB, 26.88% gc time)


1.2497898375531568e6

In [12]:
sum(A)
@time sum(A)

  0.004626 seconds (1 allocation: 16 bytes)


4.999904134443589e6

¡Vaya! ¿Qué sucedió? ¡No solo obtuvimos la respuesta incorrecta, sino que fue lento obtenerla!

In [13]:
function threaded_sum2(A)
    r = Atomic{eltype(A)}(zero(eltype(A)))
    @threads for i in eachindex(A)
        @inbounds atomic_add!(r, A[i])
    end
    return r[]
end

threaded_sum2(A)
@time threaded_sum2(A)

  0.799169 seconds (30 allocations: 2.234 KiB)


4.9999041344437245e6

¡Bien! Ahora obtuvimos la respuesta correcta (ajustando alguna asociatividad de punto flotante), pero aún es más lento que simplemente hacerlo en 1 núcleo.

In [14]:
threaded_sum2(A) ≈ sum(A)

true

¡Pero sigue siendo lento! ¡Usando *atomics* es mucho más lento que solo agregar números enteros porque constantemente tenemos que ir y verificar qué procesador tiene el último trabajo! Recuerde también que cada subproceso se ejecuta en su propio procesador, ¡y ese procesador también es compatible con SIMD! Bueno, eso si no tuviera que preocuparse por sincronizarse con los otros procesadores...

In [15]:
function threaded_sum3(A)
    r = Atomic{eltype(A)}(zero(eltype(A)))
    len, rem = divrem(length(A), nthreads())
    @threads for t in 1:nthreads()
        rₜ = zero(eltype(A))
        @simd for i in (1:len) .+ (t-1)*len
            @inbounds rₜ += A[i]
        end
        atomic_add!(r, rₜ)
    end
    # catch up any stragglers
    result = r[]
    @simd for i in length(A)-rem+1:length(A)
        @inbounds result += A[i]
    end
    return result
end

threaded_sum3(A)
@time threaded_sum3(A)

  0.003662 seconds (29 allocations: 2.266 KiB)


4.999904134443581e6

Rayos, eso es complicado. También hay un problema:

In [16]:
threaded_sum3(rand(10) .+ rand(10)im) # ¡Prueba con un arreglo de numeros complejos!

LoadError: TypeError: in Atomic, in T, expected T<:Union{Bool, Float16, Float32, Float64, Int128, Int16, Int32, Int64, Int8, UInt128, UInt16, UInt32, UInt64, UInt8}, got Type{ComplexF64}

In [17]:
R = zeros(eltype(A), nthreads())

4-element Vector{Float64}:
 0.0
 0.0
 0.0
 0.0

In [18]:
function threaded_sum4(A)
    R = zeros(eltype(A), nthreads())
    @threads for i in eachindex(A)
        @inbounds R[threadid()] += A[i]
    end
    r = zero(eltype(A))
    # sum the partial results from each thread
    for i in eachindex(R)
        @inbounds r += R[i]
    end
    return r
end

threaded_sum4(A)
@time threaded_sum4(A)

  0.003852 seconds (28 allocations: 2.281 KiB)


4.999904134443436e6

Esto sacrifica nuestra capacidad para `@simd`, por lo que es un poco más lento, ¡pero al menos no tenemos que preocuparnos por todos esos índices! Y tampoco necesitamos preocuparnos por lo atómico y podemos admitir nuevamente matrices de cualquier elemento:

In [19]:
threaded_sum4(rand(10) .+ rand(10)im)

4.512829084959211 + 4.473033077368201im


## Conclusiones clave de `threaded_sum`:

* Tenga cuidado con el estado compartido entre hilos: ¡puede dar lugar a respuestas incorrectas!
    * Protéjase usando atomics (or [locks/mutexes](https://docs.julialang.org/en/v1/base/multi-threading/#Synchronization-Primitives-1))
    * Mejor aún: divida el trabajo manualmente de modo que los bucles internos no compartan el estado. `@threads for i in 1:nthreads()` es una expresión útil.
    * Alternativamente, solo use una matriz y solo acceda a los elementos de un solo hilo

# Ser cuidadoso con el *global state*  (¡incluso si no es obvio!)

Otra clase de algoritmo que puede querer paralelizar es un problema de monte-carlo. Dado que cada iteración es un nuevo sorteo aleatorio, y dado que está interesado en observar el resultado agregado, parece que debería prestarse a el paralelismo muy bien!

### Metodo de Monte Carlo
El método de Montecarlo es un método no determinista o estadístico numérico, usado para aproximar expresiones matemáticas complejas y costosas de evaluar con exactitud. El método se llamó así en referencia al Casino de Montecarlo (Mónaco) por ser “la capital del juego de azar”, al ser la ruleta un generador simple de números aleatorios. 

#### Cálculo de $\pi$ por Montecarlo

Consideremos al círculo unitario inscrito en el cuadrado de lado 2 centrado en el origen. Dado que el cociente de sus áreas es $\frac{\pi}{4}$, el valor de $\pi$ puede aproximarse usando Montecarlo de acuerdo al siguiente método:

1. Dibuja un círculo unitario, y al cuadrado de lado 2 que lo inscribe.
2. Lanza un número $n$ de puntos aleatorios uniformes dentro del cuadrado.
3. Cuenta el número de puntos dentro del círculo, es decir, puntos cuya distancia al origen es menor que 1.
4. El cociente de los puntos dentro del círculo dividido entre $n$ es un estimado de $\frac{\pi}{4}$. Multiplica el resultado por 4 para estimar $\pi$.

<img src='Estimacion_de_Pi_por_Montercarlo.gif'>

En este cálculo se tienen que hacer dos consideraciones importantes:

1. Si los puntos no están uniformemente distribuidos, el método es inválido.
2. La aproximación será pobre si solo se lanzan unos pocos puntos. En promedio, la aproximación mejora conforme se aumenta el número de puntos.

In [20]:
using BenchmarkTools

In [21]:
function serialpi(n)
    inside = 0
    for i in 1:n
        x, y = rand(), rand()
        inside += (x^2 + y^2 <= 1)
    end
    return 4 * inside / n
end
serialpi(1)
@time serialpi(100_000_000)

  0.338741 seconds


3.14179224

In [22]:
using .Threads
nthreads()

4

Usemos las técnicas que aprendimos anteriormente para hacer una implementación de subprocesos rápida:

In [23]:
function threadedpi(n)
    inside = zeros(Int, nthreads())
    @threads for i in 1:n
        x, y = rand(), rand()
        @inbounds inside[threadid()] += (x^2 + y^2 <= 1)
    end
    return 4 * sum(inside) / n
end
threadedpi(100_000_000)
@time threadedpi(100_000_000)

  0.089154 seconds (29 allocations: 2.297 KiB)


3.14148928

Bien, ahora ¿por qué no funcionó? ¡Es lento! Veamos la secuencia de números aleatorios que generamos:

In [24]:
import Random
Random.seed!(0)
N = 20000
Rserial = zeros(N)
for i in 1:N
    Rserial[i] = rand()
end
Rserial

20000-element Vector{Float64}:
 0.4056994708920292
 0.06854582438651502
 0.8621408571954849
 0.08597086585842195
 0.6616126907308237
 0.11632735383158599
 0.1093856021447891
 0.7020044441837296
 0.2895098423219379
 0.028549977665983994
 0.538639413965653
 0.8969897902567084
 0.25847781536337067
 ⋮
 0.20453475366744478
 0.5306149811432983
 0.03456372966458843
 0.220988862426311
 0.9249279921301397
 0.007990107701113969
 0.6060173783083965
 0.40485400823870843
 0.9706620597853558
 0.5881340040386561
 0.46443211274507834
 0.8518653025256372

In [25]:
Random.seed!(0)
Rthreaded = zeros(N)
@threads for i in 1:N
    Rthreaded[i] = rand()
end
Rthreaded

20000-element Vector{Float64}:
 0.3337624014920011
 0.08778934169751962
 0.6975330242458738
 0.638933502898981
 0.00864887382901669
 0.5512665131618238
 0.3212281816145137
 0.3463918406165878
 0.6179993182252156
 0.24281669365162994
 0.8852899334338926
 0.6993599111561082
 0.4643813532164971
 ⋮
 0.9373739994909909
 0.00984536404533165
 0.6010075144770451
 0.6985782740022968
 0.7528479401839744
 0.03945273575026298
 0.6860815783160642
 0.6399224816495211
 0.19193799465910422
 0.9785012706620614
 0.9281000132569841
 0.021088421664114843

In [36]:
Set(Rserial) == Set(Rthreaded)

false

In [37]:
indexin(Rserial, Rthreaded)

20000-element Vector{Union{Nothing, Int64}}:
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 ⋮
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing

In [38]:
length(unique(Rthreaded))

20000

In [39]:
length(unique(Rserial))

20000

¡Ajá, rand() no es seguro para subprocesos! Está mutando (y leyendo) algo global cada vez para averiguar qué sigue. Esto conduce a ralentizaciones y, lo que es peor, sesga la distribución generada de números aleatorios, ¡ya que algunos se repiten!

In [41]:
const ThreadRNG = Vector{Random.MersenneTwister}(undef, nthreads())
@threads for i in 1:nthreads()
    ThreadRNG[Threads.threadid()] = Random.MersenneTwister()
end
function threadedpi2(n)
    inside = zeros(Int, nthreads())
    len, rem = divrem(n, nthreads())
    rem == 0 || error("use a multiple of $(nthreads()), please!")
    @threads for i in 1:nthreads()
        rng = ThreadRNG[threadid()]
        v = 0
        for j in 1:len
            x, y = rand(rng), rand(rng)
            v += (x^2 + y^2 <= 1)
        end
        inside[threadid()] = v
    end
    return 4 * sum(inside) / n
end
threadedpi2(8)
@time threadedpi2(100_000_000)



  0.070393 seconds (126 allocations: 11.062 KiB)


3.14194636

Aparte, tenga cuidado al inicializar muchos `MersenneTwister`s con diferentes estados. Es mejor usar [`randjump`](https://docs.julialang.org/en/v1/manual/parallel-computing/#Side-effects-and-mutable-function-arguments-1) para avanzar por un solo estado.

## Cuidado con la sobresuscripción
¿Recuerdas cómo se distribuye(*Threaded*) BLAS de forma predeterminada? ¿Qué sucede si intentamos usar`@threads`
en algo que usa BLAS?

In [42]:
Ms = [rand(1000, 1000) for _ in 1:100]
function serial_matmul(As)
    first_idxs = zeros(length(As))
    for i in eachindex(As)
        @inbounds first_idxs[i] = (As[i]'*As[i])[1]
    end
    first_idxs
end
serial_matmul(Ms);
@time serial_matmul(Ms);

  1.216967 seconds (201 allocations: 762.945 MiB, 7.51% gc time)


In [44]:
using LinearAlgebra
BLAS.set_num_threads(nthreads()) #Explicitamente se dice a BLAS que use el mismo numero de hilos(subprocesos)
function threaded_matmul(As)
    first_idxs = zeros(length(As))
    @threads for i in eachindex(As)
        @inbounds first_idxs[i] = (As[i]'*As[i])[1]
    end
    first_idxs
end
threaded_matmul(Ms)
@time threaded_matmul(Ms);

  1.631044 seconds (233 allocations: 762.947 MiB)


In [45]:
BLAS.set_num_threads(1)
@time threaded_matmul(Ms);

  0.909218 seconds (233 allocations: 762.947 MiB, 0.33% gc time)


In [48]:
@time serial_matmul(Ms) # Nuevamente, ahora BLAS tiene solo 1 subproceso(thread) 

  2.928124 seconds (201 allocations: 762.945 MiB, 0.90% gc time)


100-element Vector{Float64}:
 339.0849100125747
 338.2098656726803
 342.4082577807274
 311.08523863219614
 319.5204869300619
 341.97867401108164
 330.169191474512
 347.2614965789113
 336.7961972346558
 342.6469999594585
 332.98055999136466
 320.51666366414344
 335.7288446163673
   ⋮
 343.3940418238871
 336.4509531242172
 321.7695590178687
 322.99401977345406
 329.0111918821839
 323.32771707024796
 337.1525924611082
 330.9462705457354
 321.9632769483542
 315.19256871722143
 332.3138451894599
 331.19904089271836

# Cuidado con "false sharing"

## ¡Más mejoras vienen aquí!

PARTR Ready Run TimeLas mejoras de subprocesos(*hilos/threading*) que discutimos al inicio tienen como objetivo direccionar este problema de tener funciones de biblioteca implementadas con `@threads`y después tener a quien llama llamandolos con `@threads`. Usa una linea de trabajo del estado del arte del dispositivo para asegurarte que todos los subprocesos estan ocupados.
