# 「非同期実行を用いたバッチRPC処理の実装」

【原題】Implementing Batch RPC Processing Using Asynchronous Executions

【原著】[Shen Li](https://mrshenli.github.io/)

【元URL】https://pytorch.org/tutorials/intermediate/rpc_async_execution.html

【翻訳】電通国際情報サービスISID HCM事業部　櫻井 亮佑

【日付】2020年12月05日

【チュトーリアル概要】

前提知識:
- [PyTorch Distributedの概要](https://pytorch.org/tutorials/beginner/dist_overview.html)（日本語版6_1）
- [分散RPCフレームワーク入門](https://pytorch.org/tutorials/intermediate/rpc_tutorial.html)（日本語版6_5）
- [分散RPCフレームワークを用いたパラメーターサーバーの実装](https://pytorch.org/tutorials/intermediate/rpc_param_server_tutorial.html)（日本語版6_6）
- [RPCの非同期実行デコレーター](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.functions.async_execution)

<br>

本チュートリアルでは、ブロックされているRPCスレッドの数を減らしつつ、呼び出し先でCUDAの操作を統合することで訓練の高速化を補助する [@rpc.functions.async_execution](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.functions.async_execution) を使用して、バッチ処理を行うRPCアプリケーションを構築する方法を解説します。

なお、この方法は、[TorchServe
でのバッチ推論](https://pytorch.org/serve/batch_inference_with_ts.html) と同じ流れで構築されています。


**注意**<br>
本チュートリアルにはPyTorch v1.6.0以上が必要です。

## 基本的な内容

前のチュートリアル（日本語版6_7）では、[torch.distributed.rpc](https://pytorch.org/docs/stable/rpc.html) を使って分散訓練アプリケーションを構築する手順を示しましたが、RPCのリクエストを処理する際の呼び出し先については深く追求しませんでした。


PyTorch v1.5において、各RPCは、リクエスト内の関数が値を返すまで、呼び出し先で一つのスレッドをブロックした状態で関数を実行します。

この仕組みは多くのケースで通用しますが、注意点が一つあります。



IO上でユーザー関数がブロックされている場合、例えば、ネストされたRPCの実施や信号の送信をしているケース、あるいは異なるRPCのリクエストのブロックの解除を待機しているケースでは、呼び出し先のRPCスレッドは、IOが終了するか信号を送信するイベントが発生するまでは、アイドル状態で待機する必要があります。

結果として、RPCの呼び出し先は必要以上に多くのスレッドを使用するようになります。

この問題の原因は、RPCがユーザー関数をブラックボックスに取り扱っており、関数内で行われる処理をほとんど認識していない点にあります。

ユーザー関数がRPCスレッドを生成、解放できるようにするには、RPCシステムにより多くのヒントとなる情報を提供してあげる必要があります。

v1.6.0より、PyTorchでは2つの新しい概念を導入し、この問題に対処しています。
- 非同期実行をカプセル化し、コールバック関数のインストールもサポートしている [torch.futures.Future](https://pytorch.org/docs/master/futures.html)型
- 対象の関数でfutureを返し、実行中に一時停止と複数回の生成ができることを、アプリケーションが呼び出し先に伝えるための [rpc.functions.async_execution](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.functions.async_execution)デコレーター

上記の2つのツールにより、アプリケーションコードはユーザー関数を複数のより小さな関数に細分化することが可能になり、`Future`オブジェクト上でコールバックとしてそれらの関数をつなぎ合わせ、最終的な結果を含む `Future` を返せるようになります。



呼び出し先では、`Future` オブジェクトを取得する際、後続のRPCのレスポンスの準備とコールバックとしての通信をインストールしますが、これらの処理は最終的な結果の準備ができたときにトリガーされます。

このようにすることで、呼び出し先はスレッドをブロックする必要がなくなり、最終的な返り値の準備が出来るまで待機する必要もなくなります。

単純なサンプル例については、[@rpc.functions.async_execution](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.functions.async_execution) のAPIドキュメントを参照ください。

呼び出し先でのアイドル状態のスレッド数を減らす他に、これらのツールはバッチRPC処理のさらなる簡易化、高速化に貢献します。


本チュートリアル内で紹介する2つのセクションでは、[@rpc.functions.async_execution](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.functions.async_execution)デコレーターを使用して、分散バッチ更新パラメーターサーバーと、バッチ処理強化学習アプリケーションを構築する方法を解説します。

## バッチ更新パラメーターサーバー

一つのパラメーターサーバー（PS）と複数のトレーナーを備えた、同期型パラメーターサーバー訓練アプリケーションについて解説します。

このアプリケーションでは、PSがパラメーターを保持し、すべてのトレーナーが報告する勾配を待機します。

また、毎イテレーションにおいて、すべてのトレーナーから勾配を受け取るまで待機し、勾配を受け取った後ですべてのパラメーターを一度に更新します。



下記のコードは、PSクラスの実装を示しています。

`update_and_fetch_model` メソッドは、`@rpc.functions.async_execution` で修飾されており、トレーナーによって呼び出されるメソッドです。

そして、`update_and_fetch_model` メソッドは呼び出しごとに、更新されたモデルが格納される `Future` オブジェクトを返します。

ほとんどのトレーナーから起動される呼び出しは、`.grad` フィールドに勾配を蓄積して即時に値を返し、PS上でRPCスレッドを生成するだけです。

そして、最後に到着するトレーナーは、オプティマイザーステップをトリガーし、それまでに報告された勾配をすべて処理します。

処理後、更新されたモデルを `future_model` に設定し、`Future` オブジェクトを通じて他のトレーナーからの以前のリクエストを通知して、すべてのトレーナーに更新されたモデルを送信します。

In [None]:
import threading
import torchvision
import torch
import torch.distributed.rpc as rpc
from torch import optim

num_classes, batch_update_size = 30, 5

class BatchUpdateParameterServer(object):
    def __init__(self, batch_update_size=batch_update_size):
        self.model = torchvision.models.resnet50(num_classes=num_classes)
        self.lock = threading.Lock()
        self.future_model = torch.futures.Future()
        self.batch_update_size = batch_update_size
        self.curr_update_size = 0
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        for p in self.model.parameters():
            p.grad = torch.zeros_like(p)

    def get_model(self):
        return self.model

    @staticmethod
    @rpc.functions.async_execution
    def update_and_fetch_model(ps_rref, grads):
        # RRefを使用してローカルのPSインスタンスを取得
        self = ps_rref.local_value()
        with self.lock:
            self.curr_update_size += 1
            # 勾配を累積して .grad フィールドへ
            for p, g in zip(self.model.parameters(), grads):
                p.grad += g
            
            # 現在の future_model を保存し、返り値として返し、
            # このスレッドが値を返す前に別のスレッドが future_model に手を加えたとしても
            # 返す Future オブジェクトが正しいモデルを保持するようにする。
            fut = self.future_model

            if self.curr_update_size >= self.batch_update_size:
                # モデルを更新
                for p in self.model.parameters():
                    p.grad /= self.batch_update_size
                self.curr_update_size = 0
                self.optimizer.step()
                self.optimizer.zero_grad()
                # 結果を Future オブジェクト上に設定し、
                # この更新されたモデルを求めているすべての過去のリクエストに通知され、
                # それらのリクエストに応じてレスポンスが送信されます。
                fut.set_result(self.model)
                self.future_model = torch.futures.Future()

        return fut

トレーナーについては、PSから得られる同じパラメーターのセットを使用してすべて初期化されます。

そして、各イテレーションにおいてそれぞれのトレーナーは、最初にフォワードパスとバックワードパスを実行し、ローカルに勾配を生成します。

その後、各トレーナーはRPCを用いて勾配をPSに報告し、同一のRPCリクエストの返り値を通して更新されたパラメーターを受け取ります。



なお、トレーナーの実装において、目的の関数が `@rpc.functions.async_execution` で修飾されているか否かで違いは生じません。

トレーナーはシンプルであり、更新されたモデルが返ってくるまでトレーナー上で処理をブロックする `rpc_sync` を使用して、`update_and_fetch_model` を呼び出すだけとなります。

In [None]:
batch_size, image_w, image_h  = 20, 64, 64

class Trainer(object):
    def __init__(self, ps_rref):
        self.ps_rref, self.loss_fn = ps_rref, torch.nn.MSELoss()
        self.one_hot_indices = torch.LongTensor(batch_size) \
                                    .random_(0, num_classes) \
                                    .view(batch_size, 1)

    def get_next_batch(self):
        for _ in range(6):
            inputs = torch.randn(batch_size, 3, image_w, image_h)
            labels = torch.zeros(batch_size, num_classes) \
                        .scatter_(1, self.one_hot_indices, 1)
            yield inputs.cuda(), labels.cuda()

    def train(self):
        name = rpc.get_worker_info().name
        # モデルパラメーターの初期値を取得
        m = self.ps_rref.rpc_sync().get_model().cuda()
        # 訓練開始
        for inputs, labels in self.get_next_batch():
            self.loss_fn(m(inputs), labels).backward()
            m = rpc.rpc_sync(
                self.ps_rref.owner(),
                BatchUpdateParameterServer.update_and_fetch_model,
                args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]),
            ).cuda()

本チュートリアルでは、マルチプロセスを起動するコードを省略しており、実装の全内容については、[サンプルコード](https://github.com/pytorch/examples/tree/master/distributed/rpc) のリポジトリを参照してください。


なお、[@rpc.functions.async_execution](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.functions.async_execution)デコレーターが無くても、バッチ処理の実装が可能である点には留意してください。

しかし、[@rpc.functions.async_execution](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.functions.async_execution)デコレーター無しで実装する場合、PS上でより多くのRPCスレッドをブロックするか、更新されたモデルを受け取る別のRPCを使用する必要が生じます。

さらに後者の場合においては、よりコードが複雑になり、通信のオーバーヘッドも増加します。

本セクションでは、単純なパラメーターサーバー訓練の例を扱い、[@rpc.functions.async_execution](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.functions.async_execution)デコレーターを用いてバッチRPCアプリケーションを実装する方法を示しました。

次のセクションでは、以前のチュートリアル [分散RPCフレームワーク入門](https://pytorch.org/tutorials/intermediate/rpc_tutorial.html) （日本語版6_5）で扱った強化学習の例を、バッチ処理を用いて再実装し、バッチ処理が訓練スピードに与える影響について解説します。

## バッチ処理カートポールソルバー

本セクションでは、[OpenAI Gym](https://gym.openai.com/) よりCartPole-v1を例として使用し、バッチ処理RPCのパフォーマンス面での影響をお見せします。

なお、最良のカートポールソルバーを構築することや最難関のRLの課題を解くことが目標ではなく、[@rpc.functions.async_execution](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.functions.async_execution) の使用方法を解説することが目標である点に留意してください。



下記に示すように、以前のチュートリアルで使用したものと同様の `Policy` モデルを使います。

以前のチュートリアルと比較した際の違いは、コンストラクターにおいて、`F.softmax` の `dim` パラメーターを制御するために、 引数`batch` を追加している点です。


これは、バッチ処理を行う際、`forward` 関数内の引数 `x` が複数のオブザーバーから得た状態を含んでいるため、次元が適切に変更される必要があるためです。

他の部分はすべてそのままです。

In [None]:
import argparse
import torch.nn as nn
import torch.nn.functional as F

parser = argparse.ArgumentParser(description='PyTorch RPC Batch RL example')
parser.add_argument('--gamma', type=float, default=1.0, metavar='G',
                    help='discount factor (default: 1.0)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
                    help='random seed (default: 543)')
parser.add_argument('--num-episode', type=int, default=10, metavar='E',
                    help='number of episodes (default: 10)')
args = parser.parse_args()

torch.manual_seed(args.seed)

class Policy(nn.Module):
    def __init__(self, batch=True):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)
        self.dim = 2 if batch else 1

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=self.dim)

`Observer` のコンストラクターも同様に修正を行います。

すなわち、引数`batch`を取り、アクションを選択するために使用する `Agent` の関数を決定します。

そしてObserberは、バッチモードの場合、この後実装する `Agent` 上で `select_action_batch` を呼び出しますが、この関数が [@rpc.functions.async_execution](https://pytorch.org/docs/master/rpc.html#torch.distributed.rpc.functions.async_execution) で修飾されます。

In [None]:
import gym
import torch.distributed.rpc as rpc

class Observer:
    def __init__(self, batch=True):
        self.id = rpc.get_worker_info().id - 1
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)
        self.select_action = Agent.select_action_batch if batch else Agent.select_action

前のチュートリアル [分散RPCフレームワーク入門](https://pytorch.org/tutorials/intermediate/rpc_tutorial.html) （日本語版6_5）と比較すると、オブザーバーは少し異なった振る舞いをします。

オブザーバーは、環境が止まった際に離脱する代わりに、各エピソード毎で常に `n_steps` 回イテレーションを実行します。

そして、環境が返ってくる際に、単にオブザーバーは環境をリセットし、また最初からやり直します。



この設計では、エージェントは各オブザーバーから一定数の状態を受け取るため、それらの状態を固定長のテンソルへと詰め込むことができます。

各ステップでは、`Observer` がRPCを使用して `Agent` に状態を送信し、そして返り値を通してアクションを受け取ります。

そして、各エピソードの終了時には、すべてのステップの報酬を `Agent` に返します。

なお、この `run_episode` 関数は、RPCを用いて `Agent` によって呼び出される点に留意してください。

そのため、この関数内での `rpc_sync` の呼び出しは、ネストされたRPCの使用になります。

ちなみに、この関数を `@rpc.functions.async_execution` でマークすることで、`Observer`上で発生するスレッドのブロックを避けることも可能です。

しかし、`Observer` ではなく `Agent` がボトルネックであるため、`Observer` のプロセス上のスレッドをブロックしてしまうことは、問題ないはずです。

In [None]:
import torch

class Observer:
    # ...

    def run_episode(self, agent_rref, n_steps):
        state, ep_reward = self.env.reset(), NUM_STEPS
        rewards = torch.zeros(n_steps)
        start_step = 0
        for step in range(n_steps):
            state = torch.from_numpy(state).float().unsqueeze(0)
            # 状態をエージェントに送信し、アクションを取得
            action = rpc.rpc_sync(
                agent_rref.owner(),
                self.select_action,
                args=(agent_rref, self.id, state)
            )

            # アクションを環境に適用し、報酬を取得
            state, reward, done, _ = self.env.step(action)
            rewards[step] = reward

            if done or step + 1 >= n_steps:
                curr_rewards = rewards[start_step:(step + 1)]
                R = 0
                for i in range(curr_rewards.numel() -1, -1, -1):
                    R = curr_rewards[i] + args.gamma * R
                    curr_rewards[i] = R
                state = self.env.reset()
                if start_step == 0:
                    ep_reward = min(ep_reward, step - start_step + 1)
                start_step = step + 1

        return [rewards, ep_reward]

`Agent` のコンストラクターも 引数`batch`を取ります。これはアクションの確率値がバッチ処理されるかを制御するのに使用されます。

バッチモードの場合、`saved_log_probs` はテンソルのリストを含んでおり、さらに各テンソルは、あるステップ内のすべてのオブザーバーから取得したアクションの確率値を含んでいます。

一方でバッチ化を行わない場合、`saved_log_probs` は、keyがオブザーバーid、valueが対象のオブザーバーにおけるアクションの確率値である辞書型のオブジェクトになります。

In [None]:
import threading
from torch.distributed.rpc import RRef

class Agent:
    def __init__(self, world_size, batch=True):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.policy = Policy(batch).cuda()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.running_reward = 0

        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(rpc.remote(ob_info, Observer, args=(batch,)))
            self.rewards[ob_info.id] = []

        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
        self.batch = batch
        self.saved_log_probs = [] if batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.future_actions = torch.futures.Future()
        self.lock = threading.Lock()
        self.pending_states = len(self.ob_rrefs)

バッチ化を行わない `select_action` は、単にPolicy経由で状態を実行し、アクションの確率値を保存して、すぐにオブザーバーにアクションを返します。

In [None]:
from torch.distributions import Categorical

class Agent:
    # ...

    @staticmethod
    def select_action(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        probs = self.policy(state.cuda())
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item()

一方でバッチ処理を行うselect_action_batchの場合、状態は、オブザーバーidを行idとして使い、2次元テンソルである self.states に格納されます。

これは、オブザーバーidによってインデックスされた特定の行に存在する、バッチ生成された `self.future_actions` Future オブジェクトにコールバック関数をインストールすることで Future をつなぎ合わせます。

そして、最後に到着するオブザーバーは、ポリシーを通してバッチ化されたすべての状態を一度に実行し、同時に self.future_actions を設定します。

これが行われた際、self.future_actions上にインストールされたすべてのコールバック関数のトリガーが発動し、コールバック関数の返り値がつなぎ合わされたFutureオブジェ
クトを配置するために使用されます。

それに伴い、Agentに対して準備するように通知し、他のオブザーバーからの過去のRPCリクエストのすべてに対してレスポンスを返します。

In [None]:
class Agent:
    # ...

    @staticmethod
    @rpc.functions.async_execution
    def select_action_batch(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        self.states[ob_id].copy_(state)
        future_action = self.future_actions.then(
            lambda future_actions: future_actions.wait()[ob_id].item()
        )

        with self.lock:
            self.pending_states -= 1
            if self.pending_states == 0:
                self.pending_states = len(self.ob_rrefs)
                probs = self.policy(self.states.cuda())
                m = Categorical(probs)
                actions = m.sample()
                self.saved_log_probs.append(m.log_prob(actions).t()[0])
                future_actions = self.future_actions
                self.future_actions = torch.futures.Future()
                future_actions.set_result(actions.cpu())
        return future_action

それでは、異なるRPC関数がどのように結合されるのか、定義してみましょう。

`Agent` は、各エピソードの実行を制御します。

初めに `rpc_async` を使用し、全オブザーバー上でエピソードを開始し、オブザーバーの報酬を含んだ返り値であるfutureをブロックします。

なお、下記のコードでは、RRefを補助する `ob_rref.rpc_async()` を使用し、ob_rref RRefを所有しているワーカー上で、与えられた引数と共に `run_episode` 関数を起動している点に留意してください。

そして、保存されたアクションの確率値と返されたオブザーバーの報酬を所定のデータフォーマットに変換し、訓練ステップを起動します。

最後に、すべての状態をリセットし、現在のエピソードの報酬を返します。

この関数は、あるエピソードを実行する際のエントリーポイントになります。

In [None]:
class Agent:
    # ...

    def run_episode(self, n_steps=0):
        futs = []
        for ob_rref in self.ob_rrefs:
            # 非同期RPCを作り、すべてのオブザーバー上でエピソードを開始
            futs.append(ob_rref.rpc_async().run_episode(self.agent_rref, n_steps))

        # すべてのオブザーバーがこのエピソードを完了するまで待機
        wait until all obervers have finished this episode
        rets = torch.futures.wait_all(futs)
        rewards = torch.stack([ret[0] for ret in rets]).cuda().t()
        ep_rewards = sum([ret[1] for ret in rets]) / len(rets)

        # 保存された確率値を一つのテンソルにstack
        if self.batch:
            probs = torch.stack(self.saved_log_probs)
        else:
            probs = [torch.stack(self.saved_log_probs[i]) for i in range(len(rets))]
            probs = torch.stack(probs)

        policy_loss = -probs * rewards / len(rets)
        policy_loss.sum().backward()
        self.optimizer.step()
        self.optimizer.zero_grad()

        # 変数のリセット
        self.saved_log_probs = [] if self.batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)

        # 実行中の報酬を計算
        self.running_reward = 0.5 * ep_rewards + 0.5 * self.running_reward
        return ep_rewards, self.running_reward

残りのコードは、他のRPCのチュートリアルと同様の、起動とロギングの通常の処理です。

本チュートリアルでは、すべてのオブザーバーはエージェントからの指令を受動的に待機しています。

実装の全内容については、[サンプルコード](https://github.com/pytorch/examples/tree/master/distributed/rpc)のリポジトリを参照してください。

In [None]:
def run_worker(rank, world_size, n_episode, batch, print_log=True):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # ランク0はエージェント
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)

        agent = Agent(world_size, batch)
        for i_episode in range(n_episode):
            last_reward, running_reward = agent.run_episode(n_steps=NUM_STEPS)

            if print_log:
                print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                    i_episode, last_reward, running_reward))
    else:
        # その他のランクはオブザーバー
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # オブザーバーは、エージェントからの支持を受動的に待機
    rpc.shutdown()


def main():
    for world_size in range(2, 12):
        delays = []
        for batch in [True, False]:
            tik = time.time()
            mp.spawn(
                run_worker,
                args=(world_size, args.num_episode, batch),
                nprocs=world_size,
                join=True
            )
            tok = time.time()
            delays.append(tok - tik)

        print(f"{world_size}, {delays[0]}, {delays[1]}")


if __name__ == '__main__':
    main()

バッチRPCは、アクションの推論をより少ないCUDAの操作に統合する上で役立ち、その結果、オーバーヘッドを減らすことができます。

上記の `main` 関数は、1から10までの異なる数のオブザーバーを使用し、バッチモードと非バッチモードの両方で同一のコードを実行します。

下の図は、デフォルトの引数の値を使用し、異なるワールドサイズ（実行されるobserverの数）にしたときの実行時間をプロットしたものです。

バッチ処理は訓練の高速化に役立つ、という期待通りの結果が確認できます。
<img src="https://pytorch.org/tutorials/_images/batch.png">

## さらに学習するための資料集

- [バッチ更新パラメーターサーバーのソースコード](https://github.com/pytorch/examples/blob/master/distributed/rpc/batch/parameter_server.py)
- [バッチ処理カートポールソルバー](https://github.com/pytorch/examples/blob/master/distributed/rpc/batch/reinforce.py)
- [分散自動微分](https://pytorch.org/docs/master/rpc.html#distributed-autograd-framework)
- [分散パイプライン並列化](https://pytorch.org/tutorials/intermediate/dist_pipeline_parallel_tutorial.html)（日本語版6_7）