# Paralelismo em Julia

In [75]:
using BenchmarkTools
using Base.Threads
@show nthreads()

nthreads() = 2


2

In [None]:
a = rand(100_000)

In [4]:
function baseline_sum(arr)
    result::eltype(arr) = zero(eltype(arr))
    for i in arr
        result += i
    end
    return result
end

baseline_sum (generic function with 1 method)

In [5]:
baseline_sum(a)

49941.07260774294

# @threads

In [2]:
function threadid_array()
    arr = Array{Int64}(undef, 10)
    @threads for i = 1:length(arr)
        arr[i] = Threads.threadid()
    end
    return arr
end

LoadError: UndefVarError: @threads not defined

In [90]:
threadid_array()

10-element Array{Int64,1}:
 1
 1
 1
 1
 1
 2
 2
 2
 2
 2

In [43]:
function threaded_sum(arr)
    results = zeros(eltype(arr), nthreads())
    @threads for id in 1:nthreads()             #@threads gera um closure       
        acc = zero(eltype(arr))                 #cada thread tem um acc localmente 
        len = div(length(arr), nthreads())      #cada thread trabalha 1/nthreads() do array
        domain = ((id-1)*len+1):id*len    
        @inbounds for i in domain          
            acc += arr[i]
        end
        results[id] = acc                       #cada soma parcial é posta em uma variável no escopo acima
    end
    sum(results)                                #soma os resultados
end

threaded_sum (generic function with 1 method)

In [None]:
threaded_sum(a) ≈ baseline_sum(a)

In [10]:
function naive_sum(arr)
    total = zero(eltype(arr))
    @threads for i in arr
        total += i
    end
    return total
end

naive_sum (generic function with 1 method)

In [77]:
naive_sum(a)

27208.474392480144

In [78]:
naive_sum(a) ≈ baseline_sum(a)

false

## Atomics

In [13]:
function naive_atomic_sum(arr)
    total = Atomic{eltype(arr)}(0)
    @threads for i in arr
        atomic_add!(total, i)
    end
    return total[]
end

naive_atomic_sum (generic function with 1 method)

In [79]:
naive_atomic_sum(a) ≈ baseline_sum(a)

true

In [70]:
@btime baseline_sum(a)

  129.398 μs (1 allocation: 16 bytes)


49941.07260774294

In [48]:
naive_atomic_sum(a)
@btime naive_atomic_sum(a)

  1.815 ms (15 allocations: 1.52 KiB)


49941.07260774359

In [17]:
function sum_thread_split(A)
   r = Atomic{eltype(A)}(zero(eltype(A)))
   len, rem = divrem(length(A), nthreads())
   #Split the array equally among the threads
   @threads for t in 1:nthreads()
      partial = zero(eltype(A))
      for i in (1:len) .+ (t-1)*len
         @inbounds partial += A[i]
      end
      atomic_add!(r, partial)
    end
   result = r[]
   #process up the remaining data
   for i in length(A)-rem+1:length(A)
      @inbounds result += A[i]
   end
   return result
end

sum_thread_split (generic function with 1 method)

In [None]:
sum_thread_split(a) ≈ baseline_sum(a)

In [53]:
sum_thread_split(a)
@btime sum_thread_split(a)

  72.586 μs (15 allocations: 1.53 KiB)


49941.07260774326

## Lock

In [20]:
function sum_thread_lock(A)
    result = zero(eltype(A))
    cond = ReentrantLock()
    len, rem = divrem(length(A), nthreads())
    @threads for t in 1:nthreads()
        partial = zero(eltype(A))
        for i in (1:len) .+ (t-1)*len
            @inbounds partial += A[i]
        end
        lock(cond)
        result += partial
        unlock(cond)
    end
    for i in length(A)-rem+1:length(A)
        @inbounds result += A[i]
    end
    return result
end

sum_thread_lock (generic function with 1 method)

In [80]:
sum_thread_lock(a) ≈ baseline_sum(a)

true

In [81]:
sum_thread_lock(a)
@btime sum_thread_lock(a)

  72.657 μs (22 allocations: 1.70 KiB)


49941.07260774326

In [23]:
println("baseline sum:")
@btime baseline_sum(a)
println("naive atomic sum:")
@btime naive_atomic_sum(a)
println("sum thread lock:")
@btime sum_thread_lock(a)
println("sum thread split")
@btime sum_thread_split(a)

baseline sum:
  129.418 μs (1 allocation: 16 bytes)
naive atomic sum:
  3.473 ms (15 allocations: 1.52 KiB)
sum thread lock:
  72.679 μs (22 allocations: 1.70 KiB)
sum thread split
  72.583 μs (15 allocations: 1.53 KiB)


49941.07260774326

In [None]:
sum_thread_split(a) == baseline_sum(a)

In [None]:
sum_thread_split(a) ≈ baseline_sum(a)

# @spawn

In [24]:
import Base.Threads.@spawn

In [82]:
function naive_spawn()
    @spawn for i= 1:10
          println(i," running on ", threadid())
    end
end
naive_spawn()

1 running on 

Task (runnable) @0x00007f227c60ee60

1
2 running on 1
3 running on 1
4 running on 1
5 running on 1
6 running on 1
7 running on 1
8 running on 1
9 running on 1
10 running on 1


In [88]:
function task_spawn()
    for i = 1:10
        @spawn println(threadid())
    end
end
task_spawn()

1
2
2
2
2
2
2
2
2
1


In [73]:
function spawn_sum!(A::Vector{Float64}, lo, hi)
    if lo == hi
        return A[lo]
    end
    mid = (hi + lo + 1) >> 1
    leftsum = @spawn spawn_sum!(A, lo, mid-1)
    rigthsum = @spawn spawn_sum!(A, mid, hi)
    return fetch(leftsum) + fetch(rigthsum)
end

spawn_sum! (generic function with 1 method)

In [55]:
@btime spawn_sum!(a, 1, length(a))

  265.830 ms (1568799 allocations: 149.06 MiB)


49941.07260774379

In [29]:
@btime baseline_sum(a)

  129.404 μs (1 allocation: 16 bytes)


49941.07260774294

# Comparando @threads e @spawn

In [30]:
function nestedloops_base(nx, ny, nz)
   state = ones(nx,ny,nz)
   for k = 1:nz
      for j = 1:ny
         for i = 1:nx
            state[i,j,k] *= sin(i*j*k)
         end
      end
   end
   return
end

nestedloops_base (generic function with 1 method)

In [31]:
function nestedloops_threads_outer_loop(nx, ny, nz)
   state = ones(nx,ny,nz)
   Threads.@threads for k = 1:nz
      for j = 1:ny
         for i = 1:nx
            state[i,j,k] *= sin(i*j*k)
         end
      end
   end
   return
end

nestedloops_threads_outer_loop (generic function with 1 method)

In [32]:
function nestedloops_threads_inner_loop(nx, ny, nz)
   state = ones(nx,ny,nz)
   Threads.@threads for k = 1:nx
      for j = 1:ny
         for i = 1:nz
            state[i,j,k] *= sin(i*j*k)
         end
      end
   end
   return
end

nestedloops_threads_inner_loop (generic function with 1 method)

In [33]:
function nestedloops_threads_3_loops(nx, ny, nz)
   state = ones(nx,ny,nz)
   @threads for k = 1:nz
      @threads for j = 1:ny
         @threads for i = 1:nx
            state[i,j,k] *= sin(i*j*k)
         end
      end
   end
   return
end

nestedloops_threads_3_loops (generic function with 1 method)

In [36]:
function nestedloops_spawn_3_loops(nx, ny, nz)
   state = ones(nx,ny,nz)
   @sync @spawn for k = 1:nz
      @sync @spawn for j = 1:ny
         @sync @spawn for i = 1:nx
            state[i,j,k] *= sin(i*j*k)
         end
      end
   end
   #println(state[2,2,2])
   return
end

nestedloops_spawn_3_loops (generic function with 1 method)

In [37]:
function nestedloops_spawn_outer_loop(nx, ny, nz)
   state = ones(nx,ny,nz)
   @sync @spawn for k = 1:nz
      for j = 1:ny
         for i = 1:nx
            state[i,j,k] *= sin(i*j*k)
         end
      end
   end
   #println(state[2,2,2])
   return
end

nestedloops_spawn_outer_loop (generic function with 1 method)

In [39]:
nx, ny, nk = 200, 200, 200
nestedloops_base(nx, ny, nk)
nestedloops_threads_outer_loop(nx, ny, nk)
nestedloops_threads_inner_loop(nx, ny, nk)

println("base line:")
@time nestedloops_base(nx, ny, nk)
println("@threads on the outer loop:")
@time nestedloops_threads_outer_loop(nx, ny, nk)
println("@threads on the inner loop:")
@time nestedloops_threads_inner_loop(nx, ny, nk)
println("@threads 3 loops:")
@time nestedloops_threads_3_loops(nx, ny, nk)
println("@spawn 3 loops")
@time nestedloops_spawn_3_loops(nx, ny, nk)
println("@spawn on the outer loop")
@time nestedloops_spawn_outer_loop(nx, ny, nk)

base line:
  0.217147 seconds (2 allocations: 61.035 MiB)
@threads on the outer loop:
  0.145858 seconds (161 allocations: 61.046 MiB, 5.47% gc time)
@threads on the inner loop:
  0.141237 seconds (163 allocations: 61.046 MiB)
@threads 3 loops:
  0.308548 seconds (191.83 k allocations: 77.680 MiB, 2.57% gc time)
@spawn 3 loops
  0.223603 seconds (13 allocations: 61.036 MiB, 1.68% gc time)
@spawn on the outer loop
  0.221564 seconds (13 allocations: 61.036 MiB)
