<a href="https://colab.research.google.com/github/amenoyoya/julia_ml-tuto/blob/master/01_tutorial/09_Julia_task.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

Google Colaboratory で本notebookを開く場合は、以下の手順を実行すること

1. 以下のコマンドを実行し、Julia 1.6.1 Kernel をインストール
2. 「ランタイム」>「ランタイムのタイプを変更」から `Julia 1.6.1` を選択して保存
3. ランタイムが一度切断され、再接続されると Julia 1.6.1 を実行可能になる
    - Google Colaboratory は、セッションの有効時間に制限があるため、実行に時間のかかるコードがある場合は、ローカル環境で環境構築して実行する方が良い

In [None]:
!curl -sSL "https://julialang-s3.julialang.org/bin/linux/x64/1.6/julia-1.6.1-linux-x86_64.tar.gz" -o julia.tar.gz
!tar -xzf julia.tar.gz -C /usr --strip-components 1
!rm -rf julia.tar.gz*
!julia -e 'using Pkg; pkg"add IJulia"'

# 非同期プログラミング

プログラムが外界とやり取りをするとき、プログラムの処理を前もって予測できない順序で実行しなければならないことがある

例えば、プログラムの途中でファイルをダウンロードする場合、ダウンロード操作を開始し、完了するのを待ちながら他の処理を行い、ファイルが利用可能になったらそれを使う処理を行いたいはずである

この種のシナリオをカバーするのが **非同期 (asynchronous) プログラミング** である

非同期プログラミングに対応するために、Julia は **タスク** を提供する

計算処理の一部分 (実際のコードでは関数) を Task として切り出すと、その部分の実行を途中で中断して他のタスクに切り替えられるようになる

中断した Task は後から再開でき、それをさらに中断することも可能である

一見するとタスクは通常の関数呼び出しに似ているが、重要な違いが二つある

1. タスクの切り替えは空間を一切使用しないので、タスクをいくら切り替えてもコールスタックは消費されない
2. 呼び出し側に制御を戻すには実行を終えるしかない通常の関数呼び出しとは対称的に、タスクの切り替えは任意のタイミングで行うことができる

## 基本的なタスク演算

Task はこれから実行される計算処理の一単位を指すハンドルと考えることができ、作成-開始-実行-終了というライフサイクルを持つ

In [1]:
"""
    Task(() -> begin
        sleep(5)
        println("done")
    end)

5秒間何もせずに待機してから "done" を出力するタスク
"""
task = @task begin
    sleep(5)
    println("done")
end

task

In [2]:
# Task は schedule 関数を呼び出すことで実行キューに追加され、実行可能な状態になる
schedule(task)

Task (runnable) @0x000000000bc507d0

In [3]:
# wait 関数でタスク呼び出しを待機することで、タスクが実行される
wait(task)

done


In [7]:
"""
タスクの作成とスケジューリングは同時に行われることが多いため @async マクロが提供されている

```julia
@async x === schedule(@task x) === schedule(Task(() -> x))
```
"""
# 5秒後に "done" を出力
task = @async begin
    sleep(5)
    println("done")
end
wait(task)

done


## チャンネルを使った通信

一部の問題では行うべきジョブの間で「呼び出し元」と「呼び出し先」が明らかでないために、関数の呼び出しでは必要になる処理を自然に定式化できない

こういった問題の例として「生産者-消費者問題」がある

- 生産者-消費者問題:
    - 消費者はものを得るために、生産者に申請を行う
    - 生産者は消費者からの申請に応じて、ものを生成して返す
    - ただし、生産者には生成すべきものがあり過ぎて、ものを返す準備が整っていない可能性がある

この問題を解決するために、Julia は `Channel` という機構を提供している

- **Channel**:
    - 待機可能な先入先出のキューで、複数のタスクの読み書きが可能

この仕組みを利用して「生産者-消費者問題」を解決する

- 生産者タスクを定義して `put!` を呼び出すと値を生産する
    - 値を消費するには、生産者を新しいタスクで実行するようにスケジュールする必要がある
    - 引数が1個の関数を引数として受けとる特殊な Channel コンストラクタを使用して、チャネルに接続されたタスクを実行することができる
- これにより `take!` を使って、チャネルオブジェクトから繰り返し値を取得可能
    - `take!` が消費者に相当する

In [8]:
"""
    Channel(channel::Channel -> ...)

- 生産者タスク: Channel を通じて以下の値を連続的に生産する
    1. "start"
    2. 2
    3. 4
    4. 6
    5. 8
    6. "stop"
"""
producer = Channel(channel::Channel -> begin
    put!(channel, "start")
    for n = 1:4
        put!(channel, 2n)
    end
    put!(channel, "stop")
end)

# 最初の take! で "start" が取得できる
take!(producer)

"start"

In [9]:
# その後 take! を呼び出すたびに 2, 4, 6, 8, "stop" が取り出される
for i = 1:5
    take!(producer) |> println
end

2
4
6
8
stop


In [10]:
# これ以上生産 (put!) されている値がなければ、消費 (take!) できなくなる
take!(producer)

LoadError: InvalidStateException("Channel is closed.", :closed)

In [11]:
# チャンネルが閉じられているかどうかは isopen 関数で判定可能
## isopen(channelTask) === false なら、それ以上 put!, take! できない
isopen(producer)

false

### 双方向チャンネル
チャンネルは書き込み口と読み込み口を持つパイプと考えることができる

- `put!` を呼び出して行うチャンネルへの書き込みは、異なるタスクに属する複数の書き込み手から一つのチャンネルに対して並列に行うことができる
- `take!` を呼び出して行うチャンネルからの読み込みは、異なるタスクに属する複数の読み込み手から一つのチャンネルに対して並列に行うことができる

```julia
# チャンネル c1 と c2 を作成する。
c1 = Channel(32)
c2 = Channel(32)

# 関数 foo: c1 からデータを読み込む → 処理を行う → c2 に結果を書き込む
function foo()
    while true
        data = take!(c1)
        [...]               # データを処理する。
        put!(c2, result)    # 結果を書き込む。
    end
end

# 実行を並行にするために、foo を実行するタスクを n 個作成・スケジュールする
for _ in 1:n
    @async foo()
end
```

- チャンネルはコンストラクタ `Channel{T}(sz)` で作成する
    - このチャンネルは `T` 型の値だけを保持でき、型を指定しないと任意の型のオブジェクトを保持するチャンネルが作成される
    - `sz` は任意の時点でチャンネルが保持できる最大の要素数を表す
        - 例えば `Channel(32)` は任意の型のオブジェクトを 32 個まで同時に保持できるチャンネルを作成し、`Channel{MyType}(64)` は MyType 型のオブジェクトを 64 個まで同時に保持できるチャンネルを作成する
        - 指定しない場合 or 0 を指定した場合 は無制限になる
- Channel が空だと、読み込み (`take!` の呼び出し) は、データが利用可能になるまでブロックされる
- Channel が満杯だと、書き込み (`put!` の呼び出し) は、空間が空くまでブロックされる
- `isready` を使うとチャンネルにオブジェクトが存在するかどうかを確認できる
- `wait` を使うとオブジェクトが利用可能になるまで待機できる
- Channel は最初開いている
    - これは `take!` と `put!` を使って自由にデータを読み書きできることを意味する
    - `put!` はチャンネルにデータを追加する
        - `Channel.sz_max` を超える数のデータは追加できないため、追加可能になるまでプログラムがブロックされる
    - `take!` はチャンネルからデータを取り出し、取り出したデータはチャンネルから削除される
        - また `take!` で取得可能なデータがない場合は、取得可能になるまでプログラムがブロックされる
- `close` を使うと Channel と閉じることができ、閉じられた Channel に対する `put!`, `take!` は失敗する

In [1]:
# String型の値を1つ保持可能なチャンネル
c_in = Channel{String}(1)

# 任意の値を1つ保持可能なチャンネル
c_out = Channel(1)

# チャンネルに保持されているデータが一杯かどうか判定する関数
isfull(c::Channel) = c.sz_max === 0 ? false : length(c.data) >= c.sz_max

# 非同期に実行されるループ処理: c_in, c_out チャンネルを通してデータのやり取りが可能
task = @async begin
    while true
        # take!() によるプログラムの停止を避けたい場合は、チャンネルにデータが登録されているか確認した方が良い
        # command = isempty(c_in.data) ? nothing : take!(c_in)
        command = isempty(c_in) ? nothing : take!(c_in)

        if command === "exit"
            break
        elseif command === "print"
            println("Hello")
            flush(stdout)
        elseif command !== nothing
            # put!() によるプログラムの停止を避けたい場合は、チャンネルに空きがあるか確認した方が良い
            if !isfull(c_out)
                put!(c_out, "get command: $command")
            end
        end

        # take!, put! によるプログラムブロックがない場合、Task 作成時にフリーズを起こすため、必ず sleep を入れる
        sleep(0.1)
    end
end

Task (runnable) @0x000000000bef07d0

In [2]:
# c_in チャンネルに "print" データを送信
## => println("Hello") が非同期で実行される
put!(c_in, "print")

"print"

Hello


In [3]:
# c_in チャンネルに "unknown" データを送信
## => c_out チャンネルに "get command: unknown" 文字列データが格納される
put!(c_in, "unknown")

# c_out チャンネルからデータを取り出す
take!(c_out)

"get command: unknown"

In [4]:
"""
タスクの状態取得

- current_task() ::Task : 現在実行中のタスクに対する参照を取得
- istaskstarted(::Task) ::Boolean : タスクが開始されているか判定
- istaskdone(::Task) ::Boolean : タスクが完了されているか判定
"""

println("current task: $(current_task())")
println("is task started: $(istaskstarted(task))")
println("is task done: $(istaskdone(task))")

current task: Task (runnable) @0x000000000e8b9d20
is task started: true
is task done: false


In [5]:
# c_in チャンネルに "exit" データを送信
## => タスクは終了するはず
put!(c_in, "exit")

# タスク終了までタイムラグがある可能性があるため 0.5 秒待機
sleep(0.5)

# タスクは終了しているはずのため istaskdone => true
istaskdone(task)

true

## タスクのエラー処理

タスクは並列実行されるため、エラーが発生しても分からないことが多い

そのため、ログファイルにエラーログを書き込むようにすると良い

Julia デフォルトの `SimpleLogger(io::IOStream)` を使っても良いが、ログファイルのローテーション機能が使える `LogRoller.jl` パッケージが便利である

In [1]:
using Pkg
Pkg.add("LogRoller")

[32m[1m    Updating[22m[39m registry at `C:\Users\user\.julia\registries\General.toml`
[32m[1m   Resolving[22m[39m package versions...
[32m[1m  No Changes[22m[39m to `C:\Users\user\.julia\environments\v1.7\Project.toml`
[32m[1m  No Changes[22m[39m to `C:\Users\user\.julia\environments\v1.7\Manifest.toml`


In [5]:
using Logging, LogRoller

"""
グローバルロガー: RollingLogger
- ファイル名: "./error.log"
- ファイル上限サイズ: 1MB (1,000,000 bytes) ※このサイズを超えるとローテーションされる
- バックアップするローテーション数: 3
- ログレベル: Error
"""
# global_logger(::Logger): グローバルロガーを設定する
global_logger(
    RollingLogger("./error.log", 1_000_000, 3, Logging.Error)
)

Base.CoreLogging.SimpleLogger(IJulia.IJuliaStdio{Base.PipeEndpoint}(IOContext(Base.PipeEndpoint(Base.Libc.WindowsRawSocket(0x00000000000003e8) open, 0 bytes waiting))), Info, Dict{Any, Int64}())

タスクのエラー処理は基本的に以下のような形で記述しておけば問題ない

```julia
task = @async try
    # some process.
catch err
    # @error マクロの第2引数に `exception=(err, catch_backtrace())`
    #   を指定するとエラーメッセージをすべてログに残すことができる
    @error "Error occurred" exception=(err, catch_backtrace())

    # rethrow() で現在のエラーを再度投げてプロセスを終了させる
    rethrow()
end
```

In [7]:
# エラーが発生するタスク
task = @async try
    for i = 1:10
        printxyz(i) # <= error: undef function
        sleep(10)
    end
catch err
    @error "Error:" exception=(err, catch_backtrace())
    rethrow()
end

Task (failed) @0x000000000b37b650
UndefVarError: printxyz not defined
Stacktrace:
 [1] [0m[1mmacro expansion[22m
[90m   @ [39m[90m.\[39m[90m[4mIn[7]:4[24m[39m[90m [inlined][39m
 [2] [0m[1m(::var"#1#2")[22m[0m[1m([22m[0m[1m)[22m
[90m   @ [39m[35mMain[39m [90m.\[39m[90m[4mtask.jl:423[24m[39m

In [8]:
istaskdone(task)

true