In [1]:
versioninfo()

Julia Version 1.8.5
Commit 17cfb8e65ea (2023-01-08 06:45 UTC)
Platform Info:
  OS: Linux (x86_64-linux-gnu)
  CPU: 12 × Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-13.0.1 (ORCJIT, skylake)
  Threads: 1 on 12 virtual cores


## 9-3. マルチプロセス

### 9-3-1. Julia のマルチプロセシング

#### コード9-52. プロセス数・ワーカープロセスの確認と追加(1)

In [2]:
using Distributed

In [3]:
nprocs()

1

In [4]:
workers()

1-element Vector{Int64}:
 1

In [5]:
addprocs(4)

4-element Vector{Int64}:
 2
 3
 4
 5

In [6]:
nprocs()

5

In [7]:
workers()

4-element Vector{Int64}:
 2
 3
 4
 5

#### コード9-53. プロセス数・ワーカープロセスの確認と追加(2)

```julia
julia> # 起動時に `julia -t 4 -p 4` としていた場合

julia> Threads.nthreads()
4

julia> using Distributed

julia> nprocs()
5

julia> workers()
4-element Vector{Int64}:
 2
 3
 4
 5
```

### 9-3-2. マルチプロセスの基本

#### `@everywhere`

#### コード9-54. `@everywhere` の使用例

In [8]:
using Distributed

In [9]:
@everywhere using Random

In [10]:
@everywhere function fib(n)
    if n ≤ 1
        n
    else
        fib(n - 2) + fib(n - 1)
    end
end

#### `remotecall()`

#### コード9-55. `remotecall` の使用例

In [11]:
workers()

4-element Vector{Int64}:
 2
 3
 4
 5

In [12]:
future = remotecall(randperm, 2, 10)

Future(2, 1, 22, ReentrantLock(nothing, 0x00000000, 0x00, Base.GenericCondition{Base.Threads.SpinLock}(Base.InvasiveLinkedList{Task}(nothing, nothing), Base.Threads.SpinLock(0)), (8, 140262933164800, 2)), nothing)

In [13]:
fetch(future)

10-element Vector{Int64}:
  3
  7
  5
  1
  8
  6
  2
 10
  4
  9

In [14]:
future = remotecall(n->(myid(), fib(n)), 3, 40);

In [15]:
fetch(future)

(3, 102334155)

#### コード9-56. `remotecall_fetch` の使用例

In [16]:
workers()

4-element Vector{Int64}:
 2
 3
 4
 5

In [17]:
remotecall_fetch(n->(myid(), fib(n)), 3, 40)  # `fetch(remotecall(～))` と同等

(3, 102334155)

#### `pmap()`

#### コード9-57. `pmap` の使用例

In [18]:
# `using Distributed`/`addprocs()` 等は実行済とする
workers()

4-element Vector{Int64}:
 2
 3
 4
 5

In [19]:
pmap(n->(myid(), fib(n)), 40:-1:21)

20-element Vector{Tuple{Int64, Int64}}:
 (2, 102334155)
 (5, 63245986)
 (4, 39088169)
 (3, 24157817)
 (3, 14930352)
 (3, 9227465)
 (3, 5702887)
 (3, 3524578)
 (3, 2178309)
 (3, 1346269)
 (3, 832040)
 (3, 514229)
 (3, 317811)
 (3, 196418)
 (3, 121393)
 (3, 75025)
 (3, 46368)
 (3, 28657)
 (3, 17711)
 (3, 10946)

#### コード9-43. `threaded_map()`（マルチスレッド版 `map()`）の実装例（再掲）

In [20]:
threaded_map(f) = map(f)

function threaded_map(f, itr)
    ntasks = Threads.nthreads()
    intermediate_channel = Channel{Task}(ntasks; spawn=true) do chnl
        for arg in itr
            put!(chnl, Threads.@spawn(f(arg)))
        end
    end
    (fetch(task) for task in intermediate_channel)
end

function threaded_map(f, itrs...)
    ntasks = Threads.nthreads()
    intermediate_channel = Channel{Task}(ntasks; spawn=true) do chnl
        for args in zip(itrs...)
            put!(chnl, Threads.@spawn(f(args...)))
        end
    end
    (fetch(task) for task in intermediate_channel)
end

threaded_map (generic function with 3 methods)

#### コード9-58. `map()`/`pmap()`/`threaded_map()` の実行結果比較

In [21]:
# `julia -t 4 -p 4` 等として起動している前提
Threads.nthreads()

1

In [22]:
last.(map(n->(myid(), fib(n)), 40:-1:21)) ==
last.(pmap(n->(myid(), fib(n)), 40:-1:21)) == 
last.(collect(threaded_map(n->(myid(), fib(n)), 40:-1:21)))

true

In [23]:
using BenchmarkTools

In [24]:
@btime map(n->(myid(), fib(n)), 40:-1:21);

  1.038 s (1 allocation: 400 bytes)


In [25]:
@btime pmap(n->(myid(), fib(n)), 40:-1:21);

  408.577 ms (1193 allocations: 51.58 KiB)


In [26]:
@btime collect(threaded_map(n->(myid(), fib(n)), 40:-1:21));

  1.045 s (176 allocations: 12.55 KiB)


#### `RemoteChannel`

#### コード9-59. `RemoteChannel` の使用例

In [27]:
jobs = RemoteChannel(()->Channel{Int}(8))

RemoteChannel{Channel{Int64}}(1, 1, 655)

In [28]:
results = RemoteChannel(()->Channel{Tuple{Int,Int}}(8))

RemoteChannel{Channel{Tuple{Int64, Int64}}}(1, 1, 656)

In [29]:
@everywhere function do_work(jobs, results)
    while true
        n = take!(jobs)
        value = fib(n)
        put!(results, (myid(), value))
    end
end

In [30]:
@async for n=40:-1:21
    put!(jobs, n)
end;

In [31]:
for p in workers()
    remote_do(do_work, p, jobs, results)
end

In [32]:
for _=1:20
    (pid, value) = take!(results)
    println((pid, value))
end

(3, 24157817)
(5, 39088169)
(2, 102334155)
(4, 63245986)
(2, 5702887)
(2, 2178309)
(2, 1346269)
(4, 3524578)
(4, 514229)
(2, 832040)
(5, 9227465)
(2, 196418)
(4, 317811)
(5, 121393)
(4, 46368)
(2, 75025)
(5, 28657)
(4, 17711)
(2, 10946)
(3, 14930352)


#### `SharedArray`

#### コード9-60. `SharedArray` の使用例

In [33]:
@everywhere using SharedArrays

In [34]:
S = SharedArray{Tuple{Int,Int},1}(20)

20-element SharedVector{Tuple{Int64, Int64}}:
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)
 (0, 0)

In [35]:
inputs = RemoteChannel(()->Channel{Int}(8))

RemoteChannel{Channel{Int64}}(1, 1, 682)

In [36]:
@async for n=40:-1:21
    put!(inputs, n)
end;

In [37]:
@sync for p in procs(S)
    @async remotecall_wait(p, p, S, inputs) do p, S, inputs
        for idx in localindices(S)
            n = take!(inputs)
            value = fib(n)
            S[idx] = (p, value)
        end
    end
end

In [38]:
S

20-element SharedVector{Tuple{Int64, Int64}}:
 (2, 63245986)
 (2, 317811)
 (2, 196418)
 (2, 121393)
 (2, 75025)
 (3, 39088169)
 (3, 9227465)
 (3, 2178309)
 (3, 832040)
 (3, 514229)
 (4, 102334155)
 (4, 46368)
 (4, 28657)
 (4, 17711)
 (4, 10946)
 (5, 24157817)
 (5, 14930352)
 (5, 5702887)
 (5, 3524578)
 (5, 1346269)

#### `@distributed`

#### コード9-61. `@distributed` の使用例(1)：単純な `for` ループのマルチプロセス化

In [39]:
@everywhere using SharedArrays

In [40]:
S = SharedArray{Tuple{Int,Int},1}(20);

In [41]:
inputs = RemoteChannel(()->Channel{Int}(8));

In [42]:
@async for n=40:-1:21
    put!(inputs, n)
end;

In [43]:
@sync @distributed for idx = 1:20
    p = myid()
    n = take!(inputs)
    value = fib(n)
    S[idx] = (p, value)
end

Task (done) @0x00007f910c326f80

In [44]:
S

20-element SharedVector{Tuple{Int64, Int64}}:
 (2, 39088169)
 (2, 9227465)
 (2, 3524578)
 (2, 832040)
 (2, 514229)
 (3, 24157817)
 (3, 14930352)
 (3, 5702887)
 (3, 2178309)
 (3, 1346269)
 (4, 63245986)
 (4, 317811)
 (4, 196418)
 (4, 121393)
 (4, 75025)
 (5, 102334155)
 (5, 46368)
 (5, 28657)
 (5, 17711)
 (5, 10946)

#### コード9-62. `@distributed` の使用例(2)：マルチプロセシング＋畳み込み演算

In [45]:
# `using Distributeed` 等略
nprocs()

5

In [46]:
workers()

4-element Vector{Int64}:
 2
 3
 4
 5

In [47]:
function mcpi_st(N=10000)  # シングルプロセス（シングルスレッド）版
    T = sum(1:N) do n
        x = rand()
        y = rand()
        Int(x^2 + y^2 ≤ 1.0)
    end
    T / N * 4
end

mcpi_st (generic function with 2 methods)

In [48]:
mcpi_st()

3.1404

In [49]:
mcpi_st(100000)

3.13592

In [50]:
function mcpi_mp(N=10000)  # マルチプロセス版
    T = @distributed (+) for _=1:N
        x = rand()
        y = rand()
        Int(x^2 + y^2 ≤ 1.0)
    end
    T / N * 4
end

mcpi_mp (generic function with 2 methods)

In [51]:
mcpi_mp()

3.1356

In [52]:
mcpi_mp(100000)

3.14736

In [53]:
using BenchmarkTools

In [54]:
@btime mcpi_st(1_000_000);

  5.106 ms (0 allocations: 0 bytes)


In [55]:
@btime mcpi_mp(1_000_000);

  905.034 μs (306 allocations: 12.75 KiB)


### コラム. 分散処理

#### 仮想コード9-a. 分散処理のサンプル

```julia
julia> using Distributed

julia> addprocs(2)
2-element Vector{Int64}:
 2
 3

julia> addprocs([("username@xxxxxxxx.local", 2)]; 
       dir="/home/username/Documents", exename="/home/username/bin/julia")
2-element Vector{Int64}:
 4
 5

julia> addprocs([("username@yyyyyyyy.local", 2)]; 
       dir="/Users/username/Documents", exename="/opt/homebrew/bin/julia")
2-element Vector{Int64}:
 6
 7

julia> pmap(n->(myid(), Sys.MACHINE, n), 1:10)
10-element Vector{Tuple{Int64, String, Int64}}:
 (2, "x86_64-w64-mingw32", 1)
 (3, "x86_64-w64-mingw32", 2)
 (4, "x86_64-pc-linux-gnu", 3)
 (5, "x86_64-pc-linux-gnu", 4)
 (6, "arm64-apple-darwin21.2.0", 5)
 (7, "arm64-apple-darwin21.2.0", 6)
 (6, "arm64-apple-darwin21.2.0", 7)
 (7, "arm64-apple-darwin21.2.0", 8)
 (7, "arm64-apple-darwin21.2.0", 9)
 (6, "arm64-apple-darwin21.2.0", 10)

julia> @everywhere function fib(n)
           if n ≤ 1
               n
           else
               fib(n - 2) + fib(n - 1)
           end
       end

julia> pmap(n->(myid(), Sys.MACHINE, fib(n)), 40:-1:21)
20-element Vector{Tuple{Int64, String, Int64}}:
 (4, "x86_64-pc-linux-gnu", 102334155)
 (6, "arm64-apple-darwin21.2.0", 63245986)
 (7, "arm64-apple-darwin21.2.0", 39088169)
 (5, "x86_64-pc-linux-gnu", 24157817)
 (2, "x86_64-w64-mingw32", 14930352)
 (3, "x86_64-w64-mingw32", 9227465)
 (3, "x86_64-w64-mingw32", 5702887)
 (3, "x86_64-w64-mingw32", 3524578)
 (2, "x86_64-w64-mingw32", 2178309)
 (2, "x86_64-w64-mingw32", 1346269)
 (5, "x86_64-pc-linux-gnu", 832040)
 (3, "x86_64-w64-mingw32", 514229)
 (2, "x86_64-w64-mingw32", 317811)
 (3, "x86_64-w64-mingw32", 196418)
 (2, "x86_64-w64-mingw32", 121393)
 (2, "x86_64-w64-mingw32", 75025)
 (3, "x86_64-w64-mingw32", 46368)
 (3, "x86_64-w64-mingw32", 28657)
 (2, "x86_64-w64-mingw32", 17711)
 (3, "x86_64-w64-mingw32", 10946)

julia> # 本節で紹介したモンテカルロ法による円周率計算
       function mcpi_mp(N=10000)
           T = @distributed (+) for _=1:N
               x = rand()
               y = rand()
               Int(x^2 + y^2 ≤ 1.0)
           end
           T / N * 4
       end
mcpi_mp (generic function with 2 methods)

julia> using BenchmarkTools

julia> @btime mcpi_mp(6_000_000)
  7.260 ms (426 allocations: 18.78 KiB)
3.1428073333333333
```