# Table of Contents
 <p><div class="lev1 toc-item"><a href="#Parallel-Computing-in-Julia" data-toc-modified-id="Parallel-Computing-in-Julia-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Parallel Computing in Julia</a></div><div class="lev2 toc-item"><a href="#Start-Julia-with-multiple-workers/processes" data-toc-modified-id="Start-Julia-with-multiple-workers/processes-11"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Start Julia with multiple workers/processes</a></div><div class="lev2 toc-item"><a href="#addprocs(),--rmprocs(),-and-@everywhere" data-toc-modified-id="addprocs(),--rmprocs(),-and-@everywhere-12"><span class="toc-item-num">1.2&nbsp;&nbsp;</span><code>addprocs()</code>,  <code>rmprocs()</code>, and <code>@everywhere</code></a></div><div class="lev2 toc-item"><a href="#remotecall(),-@spawn" data-toc-modified-id="remotecall(),-@spawn-13"><span class="toc-item-num">1.3&nbsp;&nbsp;</span><code>remotecall()</code>, <code>@spawn</code></a></div><div class="lev2 toc-item"><a href="#Running-a-function-everywhere" data-toc-modified-id="Running-a-function-everywhere-14"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Running a function everywhere</a></div><div class="lev2 toc-item"><a href="#@parallel-and-pmap()" data-toc-modified-id="@parallel-and-pmap()-15"><span class="toc-item-num">1.5&nbsp;&nbsp;</span><code>@parallel</code> and <code>pmap()</code></a></div><div class="lev2 toc-item"><a href="#Benchmark:-find-pi" data-toc-modified-id="Benchmark:-find-pi-16"><span class="toc-item-num">1.6&nbsp;&nbsp;</span>Benchmark: find pi</a></div><div class="lev2 toc-item"><a href="#Using-pmap()-to-run-a-serial-program-on-multiple-processors-with-different-arguments-to-the-function" data-toc-modified-id="Using-pmap()-to-run-a-serial-program-on-multiple-processors-with-different-arguments-to-the-function-17"><span class="toc-item-num">1.7&nbsp;&nbsp;</span>Using <code>pmap()</code> to run a serial program on multiple processors with different arguments to the function</a></div><div class="lev2 toc-item"><a href="#When-to-use-pmap()" data-toc-modified-id="When-to-use-pmap()-18"><span class="toc-item-num">1.8&nbsp;&nbsp;</span>When to use <code>pmap()</code></a></div><div class="lev2 toc-item"><a href="#Shared-arrays" data-toc-modified-id="Shared-arrays-19"><span class="toc-item-num">1.9&nbsp;&nbsp;</span>Shared arrays</a></div><div class="lev2 toc-item"><a href="#Parallel-reduction" data-toc-modified-id="Parallel-reduction-110"><span class="toc-item-num">1.10&nbsp;&nbsp;</span>Parallel reduction</a></div><div class="lev2 toc-item"><a href="#Distributed-arrays" data-toc-modified-id="Distributed-arrays-111"><span class="toc-item-num">1.11&nbsp;&nbsp;</span>Distributed arrays</a></div>

# Parallel Computing in Julia

This lecture goes over some fundamentals of parallel computing in Julia.

Machine information:

In [1]:
versioninfo()

Julia Version 0.6.4
Commit 9d11f62bcb (2018-07-09 19:09 UTC)
Platform Info:
  OS: macOS (x86_64-apple-darwin14.5.0)
  CPU: Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
  WORD_SIZE: 64
  BLAS: libopenblas (USE64BITINT DYNAMIC_ARCH NO_AFFINITY Haswell MAX_THREADS=16)
  LAPACK: libopenblas64_
  LIBM: libopenlibm
  LLVM: libLLVM-3.9.1 (ORCJIT, skylake)


## Start Julia with multiple workers/processes

The command
```bash
julia -p 4
```
will start Julia with 4 workers on a single machine.

In cluster, we may for instance create a file named `$HOME/machinefile2servers` and add the name of both servers, one line for each processes meaning if you want to run two process per server you have to repeat the server name twice:
```
server1
server1
server2
server2
```
Then the command
```bash
julia --machinefile $HOME/machinefile2servers
```
will run a 4 processor job, 2 on server1 and and 2 on server2.

## `addprocs()`,  `rmprocs()`, and `@everywhere`

In [2]:
run(`hostname`)

BS-HUAZHOU-LAP.local


In [3]:
workers()

1-element Array{Int64,1}:
 1

In [4]:
print(myid())

1

In [5]:
@everywhere print(myid())

1

In [6]:
addprocs(3)

3-element Array{Int64,1}:
 2
 3
 4

In [7]:
@everywhere println(myid())

1
	From worker 3:	3
	From worker 2:	2
	From worker 4:	4


In [8]:
@everywhere run(`hostname`)

BS-HUAZHOU-LAP.local
	From worker 2:	BS-HUAZHOU-LAP.local
	From worker 3:	BS-HUAZHOU-LAP.local
	From worker 4:	BS-HUAZHOU-LAP.local


In [10]:
rmprocs(2) # remove worker 2

Task (queued) @0x0000000128587850

In [11]:
@show nworkers(), nprocs(), Sys.CPU_CORES;

(nworkers(), nprocs(), Sys.CPU_CORES) = (2, 3, 8)


In [12]:
?nworkers

search: 

```
nworkers()
```

Get the number of available worker processes. This is one less than `nprocs()`. Equal to `nprocs()` if `nprocs() == 1`.


[1mn[22m[1mw[22m[1mo[22m[1mr[22m[1mk[22m[1me[22m[1mr[22m[1ms[22m i[1mn[22mit_[1mw[22m[1mo[22m[1mr[22m[1mk[22m[1me[22m[1mr[22m



In [13]:
?nprocs

```
nprocs()
```

Get the number of available processes.


search: [1mn[22m[1mp[22m[1mr[22m[1mo[22m[1mc[22m[1ms[22m



## `remotecall()`, `@spawn`

In [14]:
@everywhere println(myid())

1
	From worker 3:	3
	From worker 4:	4


`remotecall()`: first argument is the function name, second argument is the processor id, the remaining are arguments

In [15]:
# rand(4, 4) on worker 3
r = remotecall(rand, 3, 4, 4) # lazy evaluation

Future(3, 1, 13, Nullable{Any}())

Bring value of `r` from the remote processor to the master

In [17]:
# bring to master
fetch(r)

4×4 Array{Float64,2}:
 0.337651  0.253399  0.815344   0.169948
 0.338217  0.972971  0.204617   0.277488
 0.319419  0.716606  0.130429   0.734783
 0.261039  0.602682  0.0303147  0.887243

Bring the index(1,1) of `r` to the master

In [18]:
remotecall_fetch(getindex, 3, r, 1, 1)

0.3376511424739894

`@spawn` is similar to remotecall, Julia will choose the process number randomly.

In [19]:
r = @spawn rand(2, 2)

Future(3, 1, 16, Nullable{Any}())

In [20]:
fetch(r)

2×2 Array{Float64,2}:
 0.238268  0.471547
 0.990627  0.282843

`@spawnat` can choose the processor number to execute. In this case we are adding 1 to value of r on processor 2.

In [21]:
s = @spawnat 3 1 .+ fetch(r)

Future(3, 1, 18, Nullable{Any}())

In [22]:
@everywhere p = 5  # forces the assignment of p = 5 on all processors

In [23]:
@everywhere println(@sprintf("ID %d: %f %d", myid(), rand(), p))

ID 1: 0.713622 5
	From worker 3:	ID 3: 0.681542 5
	From worker 4:	ID 4: 0.183807 5


In [24]:
@everywhere run(`whoami`)

huazhou
	From worker 4:	huazhou
	From worker 3:	huazhou


In [25]:
@everywhere run(`hostname`)

	From worker 4:	BS-HUAZHOU-LAP.local
	From worker 3:	BS-HUAZHOU-LAP.local
BS-HUAZHOU-LAP.local


## Running a function everywhere

Let's define a function.

In [26]:
# purposefully left out @everywhere
function count_heads(n)
    println("My process id is $(myid())")
    c::Int = 0
    for i in 1:n
        c += rand(Bool)
    end
    c
end

count_heads (generic function with 1 method)

In [27]:
a = @spawn count_heads(100000000)

Future(4, 1, 27, Nullable{Any}())

In [28]:
b = @spawn count_heads(100000000)

Future(3, 1, 28, Nullable{Any}())

In [29]:
fetch(a) + fetch(b)

LoadError: [91mOn worker 4:
[91mUndefVarError: #count_heads not defined[39m
deserialize_datatype at ./serialize.jl:973
handle_deserialize at ./serialize.jl:677
deserialize at ./serialize.jl:637
handle_deserialize at ./serialize.jl:684
deserialize_global_from_main at ./distributed/clusterserialize.jl:154
foreach at ./abstractarray.jl:1733
deserialize at ./distributed/clusterserialize.jl:56
handle_deserialize at ./serialize.jl:726
deserialize at ./serialize.jl:637
handle_deserialize at ./serialize.jl:681
deserialize at ./serialize.jl:637
handle_deserialize at ./serialize.jl:684
deserialize_msg at ./distributed/messages.jl:98
message_handler_loop at ./distributed/process_messages.jl:161
process_tcp_streams at ./distributed/process_messages.jl:118
#99 at ./event.jl:73[39m

## `@parallel` and `pmap()`

In [30]:
@everywhere begin
    function parallel_func(idx)
        workernum = myid() - 1 
        sleep(workernum) # sleep for wokernum seconds
        println("job $idx")
    end
end

In [31]:
# The run below will have equal number of processors involved
@parallel for idx in 1:12
    parallel_func(idx)
end

2-element Array{Future,1}:
 Future(4, 1, 32, #NULL)
 Future(3, 1, 33, #NULL)

	From worker 3:	job 7
	From worker 4:	job 1
	From worker 3:	job 8
	From worker 3:	job 9
	From worker 4:	job 2
	From worker 3:	job 10
	From worker 4:	job 3
	From worker 3:	job 11
	From worker 3:	job 12
	From worker 4:	job 4
	From worker 4:	job 5
	From worker 4:	job 6


In [32]:
# The run below will have unequal number of processors involved
pmap(parallel_func, 1:12)

	From worker 3:	job 2
	From worker 4:	job 1
	From worker 3:	job 4
	From worker 4:	job 5
	From worker 3:	job 6
	From worker 3:	job 8
	From worker 4:	job 7
	From worker 3:	job 9
	From worker 4:	job 10
	From worker 3:	job 11
	From worker 3:	job 3


12-element Array{Void,1}:
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing
 nothing

	From worker 4:	job 12


## Benchmark: find pi

In [33]:
function findpi(n)
     inside = 0
     for i in 1:n
         x, y = rand(), rand()
         if (x^2 + y^2 <= 1)
            inside += 1
         end
   end
   4 * inside / n
end

findpi(10) # compile
@time findpi(1_000_000_000)

3.141656276

  7.068100 seconds (84 allocations: 6.107 KiB)


In [34]:
function parallel_findpi(n)
    inside =  @parallel (+) for i in 1:n
        x, y = rand(), rand()
        x^2 + y^2 <= 1 ? 1 : 0
     end
     4 * inside / n
end

@time parallel_findpi(1_000_000_000)

3.141606868

  2.052556 seconds (104.31 k allocations: 5.497 MiB, 0.39% gc time)


## Using `pmap()` to run a serial program on multiple processors with different arguments to the function

In [35]:
x_value = [3, 4, 5, 6]
y_value = [4, 5, 6, 7]

@everywhere function hypot(x, y)
    println("My process id is $(myid())")
    x, y = abs(x), abs(y)
    if x > y
        r = y / x
        return x * sqrt(1 + r * r)
    end
    if y == 0
        return zero(x)
    end
    r = x / y
    return y * sqrt(1 + r * r)
end

info("Serial")
Results = map(hypot, x_value, y_value)
println(Results)

info("Parallel")
Results = pmap(hypot, x_value, y_value)
println(Results)

My process id is 1
My process id is 1
My process id is 1
My process id is 1


[1m[36mINFO: [39m[22m[36mSerial
[39m

[5.0, 6.40312, 7.81025, 9.21954]


[1m[36mINFO: [39m[22m[36mParallel
[39m

	From worker 3:	My process id is 3
	From worker 3:	My process id is 3
	From worker 3:	My process id is 3
	From worker 4:	My process id is 4
[5.0, 6.40312, 7.81025, 9.21954]


## When to use `pmap()`

This example demonstrates when to use `pmap()`. If the function has not much work to do, serial version is going to be faster than parallel.

In [36]:
@everywhere function NotMuchToDo(x::Int64)
    return x^2 + x + 1.0
end

@everywhere function LotToDo(x::Int64)
    a = 1.0
    for i in 1:1000
        for j in 1:5000
            a += asinh(i + j) + acosh(i + j)
        end
    end
    return a
end

In [37]:
info("Precompilation")
map(NotMuchToDo, 1:1000)
pmap(NotMuchToDo, 1:1000)
map(LotToDo, 1:100)
pmap(LotToDo, 1:100)
info("Timing LotToDo function")
@time map(LotToDo, 1:100)
@time pmap(LotToDo, 1:100)
info("Timing NotMuchToDo function")
@time map(NotMuchToDo, 1:1000)
@time pmap(NotMuchToDo, 1:1000);

[1m[36mINFO: [39m[22m[36mPrecompilation
[39m[1m[36mINFO: [39m[22m[36mTiming LotToDo function
[39m

 23.554665 seconds (9 allocations: 1.156 KiB)
 11.983971 seconds (9.47 k allocations: 326.984 KiB)
  0.000015 seconds (9 allocations: 8.219 KiB)


[1m[36mINFO: [39m[22m[36mTiming NotMuchToDo function
[39m

  0.084692 seconds (92.21 k allocations: 2.765 MiB)


## Shared arrays

This function will print all 0.0 for `a`. That is because `a` at processor 2 has a different memory address from master.

In [38]:
a = zeros(10)
@parallel for i in 1:10
    a[i] = i
end
a

10-element Array{Float64,1}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

In [39]:
a = SharedArray{Float64}(10)
@parallel for i in 1:10
    a[i] = i
end
a

10-element SharedArray{Float64,1}:
  1.0
  2.0
  3.0
  4.0
  5.0
  6.0
  7.0
  8.0
  9.0
 10.0

In [40]:
println(nprocs())

@everywhere function parallel_func(idx)
    println("job $idx")
    a = idx^2 + idx - 1
    return a
end

result = SharedArray{Int64}(12)
for idx in 1:12
    result[idx] = 0
end

info("Calling @parallel")
@sync @parallel for idx in 1:12
    result[idx] = parallel_func(idx)
end

for idx in 1:12
    print(result[idx], ' ')
end
println(" ");

info("Calling pmap()")
result = pmap(parallel_func, 1:12)
println(result)

3


[1m[36mINFO: [39m[22m[36mCalling @parallel
[39m

	From worker 4:	job 1
	From worker 4:	job 2
	From worker 4:	job 3
	From worker 4:	job 4
	From worker 4:	job 5
	From worker 4:	job 6
	From worker 3:	job 7
	From worker 3:	job 8
	From worker 3:	job 9
	From worker 3:	job 10
	From worker 3:	job 11
	From worker 3:	job 12
1 5 11 19 29 41 55 71 89 109 131 155  
	From worker 4:	job 1
	From worker 3:	job 2
	From worker 4:	job 3
	From worker 3:	job 4
	From worker 4:	job 5
	From worker 3:	job 6
	From worker 4:	job 7
	From worker 3:	job 8
	From worker 4:	job 9
	From worker 3:	job 10
	From worker 4:	job 11
	From worker 3:	job 12


[1m[36mINFO: [39m[22m[36mCalling pmap()
[39m

[1, 5, 11, 19, 29, 41, 55, 71, 89, 109, 131, 155]


## Parallel reduction

In [41]:
@everywhere f(x) = x^2 + 1
a = randn(1000)
@parallel (+) for i in 1:100000
    f(a[rand(1:end)])
end

193007.31449180908

## Distributed arrays

In [42]:
@everywhere using DistributedArrays

dzeros(2, 2, 4)
dones(1, 100)
drand(2, 2, 4)
drandn(2, 2, 4)
dfill(2, 2, 4)

x = @DArray [@show x^2 for x in 1:10];

	From worker 3:	x ^ 2 = 1
	From worker 3:	x ^ 2 = 4
	From worker 3:	x ^ 2 = 9
	From worker 3:	x ^ 2 = 16
	From worker 3:	x ^ 2 = 25
	From worker 4:	x ^ 2 = 36
	From worker 4:	x ^ 2 = 49
	From worker 4:	x ^ 2 = 64
	From worker 4:	x ^ 2 = 81
	From worker 4:	x ^ 2 = 100


In [43]:
arr = rand(8, 8)
dist_arr = distribute(arr)

@show remotecall_fetch(localpart, 1, dist_arr)
@show remotecall_fetch(localpart, 3, dist_arr)
@show remotecall_fetch(localpart, 4, dist_arr);

remotecall_fetch(localpart, 1, dist_arr) = Array{Float64}(0,0)
remotecall_fetch(localpart, 3, dist_arr) = [0.0980812 0.284683 0.566077 0.596142; 0.459033 0.229338 0.546003 0.690822; 0.210434 0.651551 0.0159821 0.331547; 0.281235 0.175543 0.669213 0.18372; 0.470449 0.358349 0.0383332 0.0737577; 0.448361 0.76366 0.078924 0.394335; 0.88439 0.247884 0.218447 0.740899; 0.590145 0.424563 0.167459 0.931794]
remotecall_fetch(localpart, 4, dist_arr) = [0.373489 0.863888 0.0439293 0.418687; 0.517911 0.949794 0.924607 0.215549; 0.921827 0.853144 0.57837 0.37949; 0.266037 0.96486 0.268734 0.22419; 0.547293 0.142171 0.458552 0.348344; 0.793807 0.65928 0.995266 0.0301769; 0.133989 0.0444707 0.852576 0.939462; 0.326645 0.00111411 0.313047 0.324609]


In [44]:
@show remotecall_fetch(localindexes, 1, dist_arr)
@show remotecall_fetch(localindexes, 3, dist_arr)
@show remotecall_fetch(localindexes, 4, dist_arr);

remotecall_fetch(localindexes, 1, dist_arr) = (1:0, 1:0)
remotecall_fetch(localindexes, 3, dist_arr) = (1:8, 1:4)
remotecall_fetch(localindexes, 4, dist_arr) = (1:8, 5:8)
