In [1]:
using Distributed, BenchmarkTools
addprocs(4)

4-element Array{Int64,1}:
 2
 3
 4
 5

In [2]:
Threads.nthreads()

4

### Ways of Sharing Data in Julia 
* distributed array dArray ( each process has its own chunk .. concrete array only)
* shared array (each process has access to all data in a concrete array)
* remote channel( channels where processes can read and write arrays of composite types or custom mutable structs
* remote object ( object on distributed processes)

### Ways of Doing Parallel Computations in Julia 
* @distributed for loop + shared array ( geared towards fast small calculations)
* pmap slower longer computatation
* @async remote call ( pmap is wrapper around remote task)
* multithreading Threads.@threads 
* @threadcall calling c code on a separate uv loop

https://codingclubuc3m.github.io/2018-06-06-Parallel-computing-Julia.html
https://docs.julialang.org/en/v1/manual/parallel-computing/index.html


In [3]:
my_array = rand(1000, 100)

1000×100 Array{Float64,2}:
 0.428827   0.450482   0.426665   …  0.432342   0.987073   0.6307  
 0.718034   0.525038   0.272792      0.448928   0.95974    0.648596
 0.0542647  0.285266   0.111917      0.301554   0.987424   0.646695
 0.20483    0.968826   0.0742951     0.77281    0.0647721  0.264271
 0.348856   0.448854   0.124551      0.250567   0.981254   0.153445
 0.274881   0.307092   0.927949   …  0.926867   0.885073   0.397878
 0.962869   0.771716   0.539125      0.897364   0.72481    0.793081
 0.871436   0.951466   0.942482      0.252878   0.326462   0.913954
 0.672501   0.504633   0.490393      0.188507   0.787239   0.231216
 0.657875   0.823363   0.698378      0.306274   0.544435   0.370133
 0.632989   0.698069   0.744494   …  0.104424   0.802693   0.119264
 0.83462    0.679551   0.783996      0.139717   0.62985    0.441126
 0.833116   0.803096   0.276715      0.860222   0.094476   0.837948
 ⋮                                ⋱                                
 0.190977   0.0275971

In [3]:
using SharedArrays
# my_shared_array = SharedArray{Float64}(my_array)
# my_shared_array = SharedArray{Float64}( rand(9, 4))

In [5]:
size(my_shared_array)[2]

4

In [6]:
# using Distributed, BenchmarkTools

In [7]:
# @btime
function f()
    my_shared_array = SharedArray{Float64}( rand(1000, 4))
    for i = 1:size(my_shared_array)[1]
        for j in 2:size(my_shared_array)[2]
            my_shared_array[i,j] = my_shared_array[i,j] * my_shared_array[i,(j -1)]
        end
    end
    @everywhere GC.gc()
end 
@btime f()

  81.404 ms (888 allocations: 66.39 KiB)


In [8]:
function f()
    my_shared_array = Array{Float64}( rand(1000, 4))
    for i = 1:size(my_shared_array)[1]
        for j in 2:size(my_shared_array)[2]
            my_shared_array[i,j] = my_shared_array[i,j] * my_shared_array[i,(j -1)]
        end
    end
    @everywhere GC.gc()
end 
@btime f()

  78.563 ms (408 allocations: 80.31 KiB)


In [12]:
function f()
    my_shared_array = Array{Float64}( rand(1000, 4))
    for i = 1:size(my_shared_array)[1]
        for j in 2:size(my_shared_array)[2]
            my_shared_array[i,j] = my_shared_array[i,j] * my_shared_array[i,(j -1)]
        end
    end
#     @everywhere GC.gc()
end 
@btime f()
@time f()

  7.411 μs (4 allocations: 62.66 KiB)
  0.000020 seconds (9 allocations: 62.891 KiB)


In [10]:
function f()
    my_shared_array = SharedArray{Float64}( rand(1000, 4))
    @distributed for i = 1:size(my_shared_array)[1]
        for j in 2:size(my_shared_array)[2]
            my_shared_array[i,j] = my_shared_array[i,j] * my_shared_array[i,(j -1)]
        end
    end
#     @everywhere GC.gc()
end 
@time f()

  0.023385 seconds (42.23 k allocations: 2.169 MiB)


Task (runnable) @0x0000000116a2c6d0

In [6]:
# @btime
function f()
    my_shared_array = SharedArray{Float64}( rand(1000, 4))
    @distributed for i = 1:size(my_shared_array)[1]
        for j in 2:size(my_shared_array)[2]
            my_shared_array[i,j] = my_shared_array[i,j] * my_shared_array[i,(j -1)]
        end
    end
    @everywhere GC.gc()
end 
@btime f()

  111.488 ms (1501 allocations: 94.72 KiB)


In [16]:
function f()
    my_shared_array = Array{Float64}( rand(1000, 4))
    Threads.@threads for i = 1:size(my_shared_array)[1]
        for j in 2:size(my_shared_array)[2]
            my_shared_array[i,j] = my_shared_array[i,j] * my_shared_array[i,(j -1)]
        end
    end
end
@btime f()

  6.301 μs (5 allocations: 62.69 KiB)


In [15]:
function f()
    my_shared_array = SharedArray{Float64}( rand(1000, 4))

    Threads.@threads for i = 1:size(my_shared_array)[1]
        for j in 2:size(my_shared_array)[2]
            my_shared_array[i,j] = my_shared_array[i,j] * my_shared_array[i,(j -1)]
        end
    end
    @everywhere GC.gc()
end
@btime f()
# Note if not using a predefined array you will need to use the atomic tab to prevent race conditions

  77.962 ms (887 allocations: 66.78 KiB)


In [None]:
using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end

a = randn(1000)
@distributed (+) for i = 1:100000
    f(a[rand(1:end)])
end

In [12]:
M = Matrix{Float64}[rand(1000,1000) for i = 1:10]
pmap(svdvals, M)

UndefVarError: UndefVarError: svdvals not defined

In [13]:
# addprocs(4) # add worker processes

const jobs = RemoteChannel(()->Channel{Int}(32))

const results = RemoteChannel(()->Channel{Tuple}(32))

@everywhere function do_work(jobs, results) # define work function everywhere
           while true
               job_id = take!(jobs)
               exec_time = rand()
               sleep(exec_time) # simulates elapsed time doing actual work
               put!(results, (job_id, exec_time, myid()))
           end
       end

function make_jobs(n)
   for i in 1:n
       put!(jobs, i)
   end
end

n = 12

@async make_jobs(n); # feed the 

## False Sharing / Numa 
* https://medium.com/@genchilu/whats-false-sharing-and-how-to-solve-it-using-golang-as-example-ef978a305e10
* https://discourse.julialang.org/t/two-questions-about-multithreading/14564/4'
* https://en.wikipedia.org/wiki/Non-uniform_memory_access
* https://github.com/JuliaBLAS/SIMDArrays.jl
solution is to either pad by cache line size or preferably use thread local variables 

In [14]:
function getrange(n)
    tid = Threads.threadid()
    nt = Threads.nthreads()
    d , r = divrem(n, nt)
    from = (tid - 1) * d + min(r, tid - 1) + 1
    to = from + d - 1 + (tid ≤ r ? 1 : 0)
    from:to
end

function good_f()
    test_size = 10^8
    a = zeros(Threads.nthreads())
    b = rand(test_size)
    calls = zeros(Threads.nthreads())
    Threads.@threads for k = 1 : Threads.nthreads()
        local_a = 0.0
        local_c = 0.0
        for i in getrange(test_size)
            for j in 1:10
                local_a += b[i]
                local_c += 1
            end
        end
        a[Threads.threadid()] = local_a
        calls[Threads.threadid()] = local_c
    end
    a, calls
end

f (generic function with 1 method)

In [15]:
 @btime good_f()

  574.041 ms (6 allocations: 762.94 MiB)


([1.25008e8, 1.2501e8, 1.25007e8, 1.24984e8], [2.5e8, 2.5e8, 2.5e8, 2.5e8])

In [16]:
function bad_f()
    test_size = 10^8
    a = zeros(Threads.nthreads()*100)
    b = rand(test_size)
    calls = zeros(Threads.nthreads()*100)
    Threads.@threads for i = 1 : test_size
        for j in 1:10
            a[Threads.threadid()*100] += b[i]
            calls[Threads.threadid()*100] += 1
        end
    end
    a, calls
end

f (generic function with 1 method)

In [17]:
@btime bad_f()

  900.799 ms (6 allocations: 762.95 MiB)


([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0  …  0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.25013e8], [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0  …  0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.5e8])

In [18]:
Threads.nthreads()

4

## simd 
https://github.com/eschnett/SIMD.jl
http://kristofferc.github.io/post/intrinsics/
https://discourse.julialang.org/t/how-to-improve-performance-of-simple-averaging-function/23303/4
https://docs.julialang.org/en/v1.0.0/manual/performance-tips/#man-performance-annotations-1
@inbounds/ @simd / @fastmath


In [19]:
using Statistics, BenchmarkTools

input = 100*rand(1000)
output = zeros(1000)
output1 = zeros(1000)

function my_function!( output::Array{Float64,1} , input::Array{Float64,1} , n::Int64 )
		@views @fastmath @inbounds for i=n:length(input)
			output[i] = Statistics.mean(input[i-n+1:i])
		end
	return output
end

function my_function1!( output::AbstractVector, input::AbstractVector , n::Integer )

		@boundscheck checkbounds(input, 1:n)
		@boundscheck checkbounds(output, n:length(input))

		# this puts in output[n] the value mean(input[1:n])
		s = zero(eltype(input))
		@inbounds @simd for i=1:n
			s += input[i]
		end
		output[n] = s/n
		
		# this puts in output[i] the value mean(input[i-n+1:i])
		@fastmath @inbounds for i=n:length(input)-1
			output[i+1] = output[i] - (input[i-n+1]-input[i+1])/n
		end
		
	return output
end


@btime output = my_function!( $output , $input , 300 )
@btime output1 = my_function1!( $output1 , $input , 300 )

maximum(abs.(output-output1))

  24.620 μs (701 allocations: 32.86 KiB)
  703.949 ns (0 allocations: 0 bytes)


5.684341886080802e-14