強化学習の「GRPO」をCartPoleで実装しながら解説 のコードです
===================



**1. 概要**：

「DeepSeek-R1」で有名となった強化学習手法 GRPO（Group Relative Policy Optimization）を、CartPoleで実装しながら学びます





**2. 実装者**：小川雄太郎

**3. 実装日**：2025年02月08日

**4. 実行環境**：Google Colabratory、CPU

**5. 参考記事・スライド類**


ブログ: 「LLMチューニングのための強化学習：GRPO（Group Relative Policy Optimization）」 [[link](https://horomary.hatenablog.com/entry/2025/01/26/204545)]


**6. 実装のメイン参考**

ブログ: 「group relative policy optimization (GRPO)」 [[link](https://superb-makemake-3a4.notion.site/group-relative-policy-optimization-GRPO-18c41736f0fd806eb39dc35031758885)]


**7. その他参考にした記事**

論文

DeepSeek-R1: Incentivizing Reasoning Capability in LLMs via Reinforcement Learning [[link](https://arxiv.org/abs/2501.12948)]

DeepSeekMath: Pushing the Limits of Mathematical Reasoning in Open Language Models [[link](https://arxiv.org/abs/2402.03300)]


**8. その他**

注釈: 筆者はDeepSeekの思想問題、コンプライアンス問題などについては、肯定的立場でも否定的立場でもありません。中立的立場でもありません。そこにある技術的側面のみに関心を持ち、取り挙げます

---

# [0] 初期設定


## [0-1] GPUの初期設定を確認


In [1]:
!nvidia-smi

/bin/bash: line 1: nvidia-smi: command not found


## [0-2] package installの実施

In [2]:
# pass

## [0-3] importの実施

In [3]:
# 定番系
import os
import sys
import random
import math
import time
import re
import datetime
import json
import argparse
import numpy as np
import pandas as pd
from pandas.errors import EmptyDataError
from pandas.errors import ParserError
from PIL import Image
from tqdm import tqdm
import gc

# 描画系
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib import rc
from ipywidgets import interact
%matplotlib inline

# PyTorch関連
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader
import torchvision
from torchvision import transforms

In [4]:
# warning を非表示にしてしまう
import warnings

warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

# stderr を無効化
sys.stderr = open('/dev/null', 'w')


## [0-4] 乱数シードの初期化


In [5]:
# 乱数シードの初期化
seed_val = 1234

print("使用するseedの値：", str(seed_val))
os.environ['PYTHONHASHSEED'] = str(seed_val)
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val) # PyTorchはこちら参考 https://pytorch.org/docs/stable/notes/randomness.html
torch.cuda.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)
# GPU高速化
torch.backends.cudnn.benchmark = True

# GPUの再現性まで確保したい場合（ただし、計算速度が低下してしまいます）
# torch.backends.cudnn.deterministic = True
# torch.backends.cudnn.benchmark = False


使用するseedの値： 1234


## [0-5] 実行環境の確認

In [6]:
# -----------------------------
# 環境情報の確認
# -----------------------------
def print_program_info():
    print("Pythonのバージョン：", sys.version)
    print("---------")
    print("PyTorchのバージョン：",
    torch.__version__)
    print("GPUの枚数：", torch.cuda.device_count())
    global device  # 変数deviceは以降もプログラム本体で使用するのでglobal変数にしています
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(device)


# 【実行】確認
print_program_info()

# 上記で変数deviceをglobalにしており、deviceでGPUを使用できます


Pythonのバージョン： 3.11.11 (main, Dec  4 2024, 08:55:07) [GCC 11.4.0]
---------
PyTorchのバージョン： 2.5.1+cu124
GPUの枚数： 0
cpu


## [0-6] Google Driveのマウント（任意）


In [7]:
# from google.colab import drive
# drive.mount('/content/gdrive')

---

# [1] ひとまず適当にCartPole環境を動かす

## [1-1] CartPole環境を作成

In [8]:
import gym

# version確認
print(gym.__version__)  # 0.25.2でした

# CartPoleのenvを用意
env = gym.make('CartPole-v0')


0.25.2


## [1-2] CartPoleが倒れるまで適当に動かす

In [9]:
# 適当に動かす
# ====================
# [0] 環境をリセット+初期変数の設定
state = env.reset()
done = False        # episodeの終わりを判定するフラグ
episode_reward = 0  # episodeの総報酬を格納する
frames = []         # 画像保存用リスト

# 倒れてdone=Trueになるまで続ける
while not done:
    # [1] 画面キャプチャ
    frames.append(env.render(mode="rgb_array"))  # ndarray (400, 600, 3)

    # [2] Actionを適当に 0 or 1 から選ぶ（左か右に押す）
    action = random.choice([0, 1])

    # [3] CartPoleの台車にActionを適用してstateを更新する
    state, reward, done, _ = env.step(action)
    episode_reward += reward

# 実行結果
print("episode_reward:", episode_reward)
env.close()

episode_reward: 12.0


## [1-3] 実行結果を動画で可視化

In [10]:
import matplotlib.animation as animation
from matplotlib import rc

def show_animation(imgs):
    """動画表示する関数。引数 imgs は ndarray (400, 600, 3) のリストを想定"""
    rc("animation", html="jshtml")

    fig, ax = plt.subplots(1, 1, figsize=(5, 3))
    frames = []

    # テキストを初期化（step 数）
    text = ax.text(10, 20, "", fontsize=12, color="black")

    for i, img in enumerate(imgs):
        frame = [ax.imshow(img, animated=True)]  # 画像を描画
        frame.append(ax.text(10, 20, f"Step: {i+1}", animated=True))  # Step数表示
        frames.append(frame)

    ax.axis("off")

    ani = animation.ArtistAnimation(fig, frames, interval=100, blit=True)

    # 動画保存
    ani.save("cartpole_grpo.mp4", writer="ffmpeg")
    ani.save("cartpole_grpo.gif", writer="pillow")

    plt.close(fig)  # ここで閉じる

    return ani


In [11]:
# 可視化の実行
show_animation(frames)


ひとまず、適当に動きました。ですがすぐに倒れ、終了してしまいます。

そこで、CartPoleの状態（state）に応じて、行動を決めるように、「Policy ネットワーク」を訓練（強化学習）して、倒れない制御を目指します。

この強化学習に今回はGRPOを利用します

# [2] GRPOに必要な関数群を定義する

## [2-1] Policy Networkを定義

PolicyのNeural Network は 現在のstateを入力とし、各Actionの取りやすさ（ = logits）を出力します

In [12]:
class PolicyNet(torch.nn.Module):
    def __init__(self):
        super(PolicyNet, self).__init__()
        self.fc1 = torch.nn.Linear(4, 64)
        self.fc2 = torch.nn.Linear(64, 2)  # 左か右に押すので、最後は2ノード

    def forward(self, state):
        x = torch.nn.functional.relu(self.fc1(state))
        logits = self.fc2(x)
        return logits

## [2-2] 1エピソードを実行し、各stepでのstateやactionなどを収集する関数を定義

GRPOの重要な点として、報酬はエピソードの各step tで密に与えられるのではなく、エピソードの終わりに総報酬のみが渡されるとする（報酬が疎）。


そのため、エピソード途中の各stepに対する各actionに対しての細かい評価は個別には行わない

In [13]:
def collect_trajectory(env, net):
    # [0] 環境をリセット+初期変数の設定
    state = env.reset()
    states, log_probs,chosen_actions = [],[],[]  # ここに溜めていきます
    episode_reward = 0
    done = False

    while not done:
        # [1] そのstepでの各情報を求めます
        states.append(state)
        logits = net(torch.from_numpy(state).float())
        probs = torch.nn.functional.softmax(logits, dim=0)  # logitsにsoftmaxで、行動確率を求める
        action = torch.multinomial(probs, 1).item()         # 実行するactionを行動確率に基づき選択する
        log_prob = torch.log(probs[action])                 # 選んだactionが選択される確率のlog値
        log_probs.append(log_prob.item())
        chosen_actions.append(action)

        # [2] CartPoleの台車にActionを適用してstateを更新する
        state, reward, done, _ = env.step(action)
        episode_reward += reward

    # [3] エピソードの総報酬を最大step数である200で正規化します
    normalized_reward = episode_reward / 200.0

    return states, log_probs, chosen_actions, normalized_reward

## [2-3] GRPOに基づくAdvantage計算を定義

ここが、GRPO（Group Relative Policy Optimization）の真髄です。

PPOやActor-Critic 等であれば、価値関数を求める「状態価値関数 V(state)」のNNを使用して、Advantage（≒そのstateでの平均価値と実際の行動の価値の差）を求めますが、GRPOは報酬のみからAdvantageを計算します。

そのため、状態価値（Value）を求めるNeural Networkが不要です

In [14]:
def calc_advantages_with_grpo(trajectories):
    """trajectoriesから報酬のみを取り出し、各エピソードの報酬を標準化します"""
    rewards = [r for o, l, a, r in trajectories]  # 最終報酬を取り出して、
    mean_reward = sum(rewards) / len(rewards)     # 平均値求めて、
    std_reward = np.std(rewards)  + 1e-8          # 標準偏差求めて（1e-8は0除算対策）、
    advantages = [(r - mean_reward) / std_reward for r in rewards]  # 最後に各エピソードについて標準化

    return advantages


## [2-4] GRPOに基づくPolicy Netの重み更新関数を定義

注意：GRPOはPPOの更新式とは少し異なり、本当はKLダイバージェンスで過度な更新を防ぐ項が入りますが、今回の実装ではその項は省略しています

In [15]:
def grpo_update(trajectories, net, optimizer, n_iterations=20, eps=0.2):

    # [1] [2-3]のGRPOの関数で、各エピソードの標準化されたAdvantageを求めます
    advantages = calc_advantages_with_grpo(trajectories)

    # [2] Policy NN を更新します。n_iterations回、更新をかけます
    for i_iter in range(n_iterations):
        loss = 0
        for traj, advantage in zip(trajectories, advantages):
            (states, log_probs, chosen_actions, _) = traj  # 1エピソードの蓄えられた内容を取り出します
            trajectory_loss = 0                            # 1エピソードの損失を0に初期化

            # [3] エピソード内の各ステップについて損失（loss）を計算
            for t in range(len(states)):
                # 以下はPPOと同じ手続きです
                # ====================================
                # [3-1] 更新されたPolicyNNで、実際に step t で選択された行動がどれくらい尤もらしいかを求めます
                new_policy_probs = torch.nn.functional.softmax(net(torch.from_numpy(states[t]).float()), dim=0)
                new_log_probs = torch.log(new_policy_probs)[chosen_actions[t]]

                # [3-2] 更新前のPolicyNNで選択されたactionを選ぶ確率と、更新後のNNの場合の比を求める（対数確率なので引き算してexpする）
                ratio = torch.exp(new_log_probs - log_probs[t])

                # [3-3] 更新後の選ばれる確率が非常に大きくなってしまうと更新しすぎなので、epsの範囲にとどめる
                clipped_ratio = torch.clamp(ratio, min=1 - eps, max=1 + eps)

                # [3-4] 報酬を最大化したいので、マイナスを掛け算して最小化問題に置き換える
                # そして平均より良かった施行を増やし、悪かったケースを減らすように更新
                trajectory_loss += -clipped_ratio * advantage

            # [4] エピソードのsteps数で正規化
            trajectory_loss /= len(states)
            loss += trajectory_loss

        # [5] エピソード数で正規化
        loss /= len(trajectories)

        # [6] Policy NN の重みを更新
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    return None  # 特に何も返さない

# [3] GRPOでPolicy Netを更新しながら、CartPoleを実行する

GRPOを実行するための関数群が定義できたので、早速実行します

## [3-1] 初期化とハイパラ設定

In [16]:
# [1] 初期化と初期設定
env = gym.make('CartPole-v0')
net = PolicyNet()
optimizer = torch.optim.Adam(net.parameters(), lr=0.001)

# [2] GRPO で PocicyNetを更新する際のTrajectoryを溜める回数の設定
trajectories_per_update = 5  # group size


## [3-2] 訓練開始（100エピソードを1トライアルとして、繰り返す）

In [17]:
# [3] エピソードのループ100回を、平均報酬が195を超えるまで実施
trial_num = 0  # 100回を何回tryしたかを示す

while(True):
    # [4] エピソードのループ100回を開始
    for i_episode in range(20):  # trajectories_per_updateが5なので、20で100回になります
        trajectories, episode_rewards = [], []  # episode_rewardsには実際の総報酬を格納

        # [5] GRPO で PocicyNetを更新する際のTrajectoryを溜める（エピソード5回分）
        for _ in range(trajectories_per_update):
            states, log_probs, chosen_actions, normalized_reward = collect_trajectory(env, net)
            trajectories.append((states, log_probs, chosen_actions, normalized_reward))
            episode_rewards.append(normalized_reward * 200)  # 実際の総報酬を格納

        # [6] GRPO で PociyNetの重みを更新
        grpo_update(trajectories, net, optimizer)

    # [7] 100エピソードの平均報酬を求める
    avg_reward = sum(episode_rewards) / len(episode_rewards)
    trial_num+=1

    # [8] 終了判定
    if avg_reward > 195:
        print('訓練完了です。トライアル回数: ', trial_num)
        break
    else:
        print(f'トライアル {trial_num}回目, avg reward: {avg_reward:.2f}')

env.close()


トライアル 1回目, avg reward: 34.00
トライアル 2回目, avg reward: 169.80
トライアル 3回目, avg reward: 193.40
訓練完了です。トライアル回数:  4


# [4] 訓練後のPolicy NetでCartPoleを制御する様子を可視化する

## [4-1] 1エピソードの実施

In [19]:
# [0] 環境をリセット+初期変数の設定
state = env.reset()
done = False        # episodeの終わりを判定するフラグ
episode_reward = 0  # episodeの総報酬を格納する
frames = []         # 画像保存用リスト

# 倒れてdone=Trueになるまで続ける
while not done:
    # [1] 画面キャプチャ
    frames.append(env.render(mode="rgb_array"))  # ndarray (400, 600, 3)

    # [2] ActionをPolicyNetから選ぶ（左か右に押す）
    logits = net(torch.from_numpy(state).float())
    probs = torch.nn.functional.softmax(logits, dim=0)  # logitsにsoftmaxで、行動確率を求める
    action = torch.multinomial(probs, 1).item()         # 実行するactionを確率的に選択する

    # [3] CartPoleの台車にActionを適用してstateを更新する
    state, reward, done, _ = env.step(action)
    episode_reward += reward

# 実行結果
print("episode_reward:", episode_reward)
env.close()

episode_reward: 200.0


## [4-2] 実行結果を動画で可視化

In [20]:
# 可視化
show_animation(frames)


# 以上