# Rustの言語機能

## スレッド安全性

Rustはマルチスレッドプログラミングをサポートしており、スレッド安全性も担保されている

以下で詳しく見ていく

### スレッドの作成
スレッドを扱うには標準ライブラリの `thread` を使う

`thread::spawn` はクロージャを受け取り、それを新しいスレッドで実行する関数である

In [2]:
use std::thread;

fn main() {
    // "Hello, world!" を表示するスレッドを作成し、そのハンドルを受け取る
    let handle = thread::spawn(|| println!("Hello, world!"));
    
    // handle.join() メソッド: スレッドの終了を待機
    // ※ これを呼び出さないと、プログラムがスレッドの終了を待たずに終了してしまう可能性があり、不確定な結果となってしまう
    dbg!(handle.join());
}
main();

Hello, world!


[src/lib.rs:9] handle.join() = Ok(
    (),
)


クロージャはスコープ内の変数を自動的にキャプチャするため、変数をスレッドに渡すことができる

例えば、以下のような記述が可能である

In [4]:
use std::thread;

fn main() {
    // 各スレッドのハンドルを保持するためのベクタ
    let mut handles = Vec::new();
    
    // 1 ~ 10 のスレッドを新規作成
    for x in 1..=10 {
        handles.push(
            thread::spawn(|| println!("Thread: {}", x))
        );
    }
    // 全てのスレッドの終了を待機
    for handle in handles {
        handle.join();
    }
}
main();

Error: closure may outlive the current function, but it borrows `x`, which is owned by the current function

上記コードはコンパイルエラーとなるが、これは変数 `x` の参照が、以下のように寿命より長生きしてしまっているためである

```rust
for x in 1..=10 {                                   // | x のライフタイム
    handles.push(                                   // |
        thread::spawn(|| println!("Thread: {}", x)) // | | x の参照
    );                                              // | | スレッドの生存期間は不明のためライフタイム不定
}                                                   // | |
for handle in handles {                             //   | <= 存在しない x を参照してしまう可能性がある
    handle.join();                                  //   |
}                                                   //   |
```

これを解決するためには、`move` キーワードを用いて、キャプチャする変数の所有権をスレッドに渡してしまうのが良い

In [5]:
use std::thread;

fn main() {
    // 各スレッドのハンドルを保持するためのベクタ
    let mut handles = Vec::new();
    
    // 1 ~ 10 のスレッドを新規作成
    for x in 1..=10 {
        handles.push(
            thread::spawn(move || println!("Thread: {}", x)) // move で x の所有権を渡す
        );
        // => このスコープではもう x を使うことはできない (x の所有権がスレッドクロージャに移っているため)
    }
    // 全てのスレッドの終了を待機
    for handle in handles {
        handle.join();
    }
}
main();

Thread: 1
Thread: 2
Thread: 3
Thread: 4
Thread: 5
Thread: 6
Thread: 7
Thread: 8
Thread: 9
Thread: 10


なお、マルチスレッドプログラミングにおいて、スレッドの実行順序は不定である

そのため、上記コードは必ずしも 1～10 が順序よく実行されるわけではないことに注意が必要である

### スレッド間の情報共有
前述したマルチスレッドコードでは、一度動き出したスレッドは完了するまで待つことしかできなかった

しかし実際のシステムでは、各スレッドは実行中も様々な情報を共有しながら作業を進行させることが一般的である

Rustでは、スレッド間の情報共有の手段として以下の2つの方法が提供されている

- 共有メモリ
- メッセージパッシング

#### 共有メモリ
共有メモリは、複数のスレッドで同じメモリ領域を共有することで情報を共有する方法である

ここでは、前述のコードを改善して、各スレッドが共有のデータ `data` にアクセスするようにしていく

まずは、パッと思いつく方法として、単純に `data` 変数をキャプチャする方法を試してみる

In [6]:
use std::thread;

fn main() {
    let mut handles = Vec::new();
    let mut data = vec![0; 10]; // スレッド間共有データ
    
    for x in 1..=10 {
        handles.push(
            thread::spawn(move || data[x - 1] += x)
        );
    }
    for handle in handles {
        handle.join();
    }
    dbg!(data);
}
main();

Error: use of moved value: `data`

上記コードは上手く行かない

ムーブセマンティクスにより、1番目のスレッド作成の時点で、`data` 変数への所有権が1番目のスレッドに移ってしまい、2番目以降のスレッドでは所有権を取れなくなっているためである

この問題を解決するためには、各スレッドで所有権を共有する必要がある

Rustでは、所有権を共有可能な型として `Rc` 型が用意されている

Rc型 は、C++ における `std::shared_ptr` のような仕組みとなっており、所有権を保持している所有者の数をカウントしている（**参照カウンタ**）

そして、所有者が 0 になったタイミングでメモリを解放する、という仕組みである

In [7]:
use std::thread;
use std::rc::Rc; // Rc型をスコープに導入

fn main() {
    let mut handles = Vec::new();
    let mut data = Rc::new(vec![0; 10]); // スレッド間共有データ: 所有権複数共有型
    
    for x in 1..=10 {
        let data_ref = data.clone(); // data の所有権をコピー (参照カウンタを1つ増やす)
        handles.push(
            thread::spawn(move || data_ref[x - 1] += x)
        );
    }
    for handle in handles {
        handle.join();
    }
    dbg!(data);
}
main();

Error: `Rc<Vec<usize>>` cannot be sent between threads safely

今度は別のエラーが発生してしまっている

これは「Rc型はスレッド間を安全に渡せない」というメッセージである

実のところ Rc型 が持つ参照カウンタは、複数のスレッドが同時にアクセスした場合に壊れてしまう可能性があるのである

そのため、Rc型 のマルチスレッド版として `Arc` (Atomically Reference Counter) 型が用意されている

なぜ `Rc` と `Arc` が別々に提供されているのかと言うと、Arc型 の参照カウンタ更新には Rc型 よりも余分なコストがかかるためである

そのため、シングルスレッドでは Rc型 を、マルチスレッドでは Arc型 を使うようになっている

In [8]:
use std::thread;
use std::sync::Arc; // Arc型をスコープに導入

fn main() {
    let mut handles = Vec::new();
    let mut data = Arc::new(vec![0; 10]); // スレッド間共有データ: 所有権複数共有型 (マルチスレッド版)
    
    for x in 1..=10 {
        let data_ref = data.clone(); // data の所有権をコピー (参照カウンタを1つ増やす)
        handles.push(
            thread::spawn(move || data_ref[x - 1] += x)
        );
    }
    for handle in handles {
        handle.join();
    }
    dbg!(data);
}
main();

Error: cannot borrow data in an `Arc` as mutable

これでもまだコンパイルエラーが発生してしまう

これは Arc型 が書き換え不能であることを示している

今回のケースでは、各スレッドが `data` の別々の要素を操作することが分かっているが、一般的には必ずしもそうであるとは限らない

例えば `data_ref[(x - 1) / 2] += x` のようにすると スレッド1 と スレッド2 が同じ領域 (`data_ref[0]`) にアクセスすることになる

そのため、Arc型 では書き換えを行うことができず、別途排他制御を行う構造が必要になる

このような用途のために用意されているのが `Mutex` 型である

In [11]:
use std::thread;
use std::sync::{Arc, Mutex}; // Arc型, Mutex型をスコープに導入

fn main() {
    let mut handles = Vec::new();
    let mut data = Arc::new(Mutex::new(vec![0; 10])); // スレッド間共有データ: 排他制御機構つき所有権複数共有型
    
    for x in 1..=10 {
        let data_ref = data.clone(); // data の所有権をコピー (参照カウンタを1つ増やす)
        handles.push(
            thread::spawn(move || {
                let mut data = data_ref.lock().unwrap(); // Mutex::lock() メソッドで排他制御を開始
                data[x - 1] += x;
            })
        );
    }
    for handle in handles {
        handle.join();
    }
    dbg!(data);
}
main();

[src/lib.rs:20] data = Mutex {
    data: [
        1,
        2,
        3,
        4,
        5,
        6,
        7,
        8,
        9,
        10,
    ],
    poisoned: false,
    ..
}


上記のように、Mutex型 は `lock` というメソッドを提供しており、あるスレッドを lock を実行して data への参照を得ると、それ以外のスレッドは lock が完了しなくなる

lock 中のスレッドが data へのアクセスを完了して参照を破棄したタイミングで、他のスレッドの内の1つが lock を完了し、再び同じ処理が繰り返されることになる

このように、共有データへの参照を得るのが常に1つのスレッドだけに制限される機構を **排他制御** と呼ぶ

ここまで行って初めて、共有メモリによるスレッド間の安全な情報共有が可能となる

#### メッセージパッシング
各スレッドがメッセージをやり取りしながら（コミュニケーションをとりながら）動作する仕組みを **メッセージパッシング** と呼ぶ

Rustは、メッセージパッシングに対応したスレッド間通信用のチャンネルとして `mpsc` (Messeage Passing Channel) を持っており、このチャンネルを使うことでスレッド間情報共有が可能となる

In [12]:
use std::thread;
use std::sync::mpsc; // Message Passing Channel を使用可能に

fn main() {
    let (tx, rx) = mpsc::channel(); // mpsc::channel(): 送信・受信チャンネルのタプルを返す
    let handle = thread::spawn(move || {
        let message = rx.recv().unwrap(); // Message Passing Channel でメッセージ受信
        println!("{}", message); // => 以下のコードで送信している "Message Passing" メッセージが表示されるはず
    });
    
    // Message Passing Channel でメッセージを送信
    tx.send("Message Passing");
    
    // スレッドの終了を待機
    handle.join();
}
main();

Message Passing


上記のように Message Passing Channel は `mpsc::channel` 関数で作成できる

その戻り値は送信・受信チャンネルのタプルとなっている

メッセージは送信チャンネルから受信チャンネルへの単方向にしか通信できないため、上記の場合は main スレッドから作成したスレッドに対してしか送信できない

これを解決するためには、逆方向のチャンネルも作ってしまい、双方向通信を実現する必要がある

なお、チャンネルはメッセージをキューに蓄えるため、例えばスレッド作成前にメッセージを送信していたとしても何ら問題ない

受信チャンネルはキューに蓄えられたメッセージを順に受信していくためである

ここまでの話を織り込んだ上で、メッセージパッシングにより複雑なスレッド処理を実現してみる

In [13]:
use std::thread;
use std::sync::mpsc;

fn main() {
    let mut handles = Vec::new();
    let mut data = vec![0; 10]; // スレッド間で共有したいデータ
    let mut snd_channels = Vec::new(); // main => 各スレッドへの送信チャンネル
    let mut rcv_channels = Vec::new(); // 各スレッド => mainへの受信チャンネル
    
    for x in 1..=10 {
        // mainスレッドから各スレッドへのチャンネル作成
        let (snd_tx, snd_rx) = mpsc::channel();
        
        // 各スレッドからmainスレッドへのチャンネル作成
        let (rcv_tx, rcv_rx) = mpsc::channel();
        
        snd_channels.push(snd_tx);
        rcv_channels.push(rcv_rx);
        
        // スレッド作成
        handles.push(
            thread::spawn(move || {
                // main => 各スレッドへのメッセージを受信
                // メッセージとしてスレッド間で共有したいデータが渡されていれば安全にデータを処理できる
                let mut data = snd_rx.recv().unwrap();
                data += x;
                
                // 各スレッド => main へ、メッセージとして処理後のデータを送信して返す
                // => これにより実質的にスレッド内で排他制御を実現することができる
                rcv_tx.send(data);
            })
        );
    }
    
    // 各スレッドにスレッド間で共有したいデータをメッセージとして送信
    for x in 0..10 {
        snd_channels[x].send(data[x]);
    }
    
    // 各スレッドからのメッセージをスレッド間共有データとして受信
    for x in 0..10 {
        data[x] = rcv_channels[x].recv().unwrap();
    }
    
    for handle in handles {
        handle.join();
    }
    
    dbg!(data);
}
main();

[src/lib.rs:50] data = [
    1,
    2,
    3,
    4,
    5,
    6,
    7,
    8,
    9,
    10,
]


### Send, Sync
上記までの通り、Rustでマルチスレッドプログラミングを行おうとする場合、スレッド安全性を保証するために様々なコンパイルチェックが行われる

このスレッド安全性を保証する仕組みが `Send`, `Sync` というマーカトレイトである

- Send マーカトレイト
    - これを実装した型は、その所有権をスレッドをまたいで転送可能であることを示す
    - Send はほとんど全ての型に実装されているが、Rc など、スレッド安全性を脅かす可能性のある型には実装されていない
- Sync マーカトレイト
    - これを実装した型は、複数のスレッドから安全にアクセス可能であることを示す
    - 例えば Mutex などは lock メソッドによる排他制御によって複数のスレッドから安全にアクセスできるため、Sync を実装している

自作の型については、基本的に自分で Send や Sync を実装するべきではない

その型を構成する全ての型が Send や Sync であれば、自作型についても自動的に Send や Sync が実装されるためである

手動で実装したい場合は unsafe なコードとなるため、注意深く設計しなければならない

Rustの標準ライブラリは必ずしも充実したマルチスレッド機能を提供しているわけではないが、Send, Sync という基本的な仕組みを提供することで、サードパーティ製のライブラリに対しても、標準ライブラリと同等のスレッド安全性を提供することに成功しているのである