# 1. ライブラリのインポート

In [1]:
!pip install gymnasium stable-baselines3 numpy

Collecting stable-baselines3
  Downloading stable_baselines3-2.7.0-py3-none-any.whl.metadata (4.8 kB)
Downloading stable_baselines3-2.7.0-py3-none-any.whl (187 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m187.2/187.2 kB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: stable-baselines3
Successfully installed stable-baselines3-2.7.0


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
workspace = '/content/drive/MyDrive/RL Summer 2025/最終課題/'

# 2. ヨットの1人プレイ用 Gymnasium 環境

In [4]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
from typing import Optional

In [7]:
class YachtEnv(gym.Env):
    """
    ルールは任天堂の「世界のアソビ大全51」に基づく.
    """
    metadata = {'render.modes': ['human']}

    # --- 定数定義 ---
    NUM_DICE = 5
    NUM_SIDES = 6
    NUM_ROUNDS = 12

    # スコアカテゴリのインデックス
    ACE, DEUCE, TRAY, FOUR, FIVE, SIX = 0, 1, 2, 3, 4, 5
    CHOICE, FOUR_CARD, FULL_HOUSE, S_STRAIGHT, B_STRAIGHT, YACHT = 6, 7, 8, 9, 10, 11

    # カテゴリ名を保持
    CATEGORY_NAMES = {
        0: "エース", 1: "デュース", 2: "トレイズ", 3: "フォアーズ", 4: "ファイブズ", 5: "シックスズ",
        6: "チョイス", 7: "4カード", 8: "フルハウス", 9: "S.ストレート", 10: "B.ストレート", 11: "ヨット"
    }

    def __init__(self):
        super(YachtEnv, self).__init__()

        # ===== 状態空間の定義 ===== #
        self.observation_space = spaces.Dict({
            # サイコロの目: 5個のサイコロがそれぞれ 1 - 6 の値をとる.
            "dice": spaces.Box(low=1, high=self.NUM_SIDES, shape=(self.NUM_DICE,), dtype=np.int8),
            # 残りロール回数: 0, 1, 2
            "rolls_left": spaces.Discrete(3),
            # スコアシート: 12個のカテゴリが埋まっているか (1:空き, 0:済)
            "score_card_mask": spaces.MultiBinary(self.NUM_ROUNDS),
             # 現在のスコア: 各カテゴリの現在の点数
            "scores": spaces.Box(low=0, high=50, shape=(self.NUM_ROUNDS,), dtype=np.int8),
            # 上の段の合計点
            "upper_score_total": spaces.Box(low=0, high=150, shape=(1,), dtype=np.int16),
        })

        # --- 行動空間の定義 ---
        # 0-31: サイコロのキープ選択 (2^5 = 32通り)
        # 32-43: スコアリングする役の選択 (12通り)
        self.action_space = spaces.Discrete(32 + self.NUM_ROUNDS)


    def _calculate_score(self, dice: np.ndarray, category: int) -> int:
        """与えられたサイコロと役からスコアを計算する"""
        counts = np.bincount(dice, minlength=self.NUM_SIDES + 1)

        if self.ACE <= category <= self.SIX:
            return (category + 1) * counts[category + 1]
        elif category == self.CHOICE:
            return np.sum(dice)
        elif category == self.FOUR_CARD:
            return np.sum(dice) if np.any(counts >= 4) else 0
        elif category == self.FULL_HOUSE:
            return 25 if (3 in counts and 2 in counts) or (5 in counts) else 0
        elif category == self.S_STRAIGHT:
            unique_dice = sorted(list(set(dice)))
            # 4つ以上の連続を探す
            for i in range(len(unique_dice) - 3):
                if unique_dice[i+1] == unique_dice[i]+1 and \
                   unique_dice[i+2] == unique_dice[i]+2 and \
                   unique_dice[i+3] == unique_dice[i]+3:
                    return 30
            return 0
        elif category == self.B_STRAIGHT:
            unique_dice = sorted(list(set(dice)))
            return 40 if len(unique_dice) == 5 and (unique_dice[4] - unique_dice[0] == 4) else 0
        elif category == self.YACHT:
            return 50 if np.any(counts == 5) else 0
        return 0

    def _get_obs(self) -> dict:
        """現在の状態を観測用の辞書として返す"""
        return {
            "dice": self.dice.astype(np.int8),
            "rolls_left": self.rolls_left,
            "score_card_mask": self.score_card_mask,
            "scores": self.scores,
            "upper_score_total": np.array([self.upper_score_total], dtype=np.int16)
        }

    def reset(self, seed: Optional[int] = None, options: Optional[dict] = None) -> tuple[dict, dict]:
        """環境を初期化する"""
        super().reset(seed=seed)

        self.dice = self.np_random.integers(1, self.NUM_SIDES + 1, size=self.NUM_DICE).astype(np.int8)
        self.rolls_left = 2
        self.last_keep_mask = np.zeros(self.NUM_DICE, dtype=bool) # Initialize last_keep_mask

        self.score_card_mask = np.ones(self.NUM_ROUNDS, dtype=np.int8)
        self.scores = np.zeros(self.NUM_ROUNDS, dtype=np.int8)

        self.upper_score_total = 0
        self.bonus_achieved = False
        self.rounds_played = 0

        return self._get_obs(), {}

    def step(self, action: int) -> tuple[dict, float, bool, bool, dict]:
      """
      エージェントの行動に基づき、環境を1ステップ進める
      """
      terminated = False
      reward = 0.0

      # --- 行動の解釈 ---
      # 1. ロールフェーズの行動 (キープ選択)
      if action < 32:
          if self.rolls_left > 0:
              # actionを2進数に変換してキープマスクを作成 (例: 5 -> '00101')
              keep_mask = np.array([int(b) for b in format(action, '05b')], dtype=bool)
              self.last_keep_mask = keep_mask

              # キープしないサイコロの数だけ新たに振る
              num_to_roll = self.NUM_DICE - np.sum(keep_mask)
              new_rolls = self.np_random.integers(1, self.NUM_SIDES + 1, size=num_to_roll)

              # 新しいサイコロの状態を生成
              new_dice = self.dice.copy()
              new_dice[~keep_mask] = new_rolls
              self.dice = new_dice

              self.rolls_left -= 1

          ### <<< ここからが重要な変更点
          else:
              # ロール回数が残っていないのにロールしようとした（無効な行動）
              # => ヘビーペナルティを与え、強制的にラウンドを終了させる
              reward = -10.0 # ペナルティを大きくする

              # 空いている役の中から最初の一つを0点で埋める
              available_categories = np.where(self.score_card_mask == 1)[0]
              if len(available_categories) > 0:
                  category_to_bust = available_categories[0]
                  self.scores[category_to_bust] = 0
                  self.score_card_mask[category_to_bust] = 0

                  # ラウンド終了処理
                  self.rounds_played += 1
                  if self.rounds_played == self.NUM_ROUNDS:
                      terminated = True
                  else:
                      # 次のラウンドの準備
                      self.rolls_left = 2
                      self.dice = self.np_random.integers(1, self.NUM_SIDES + 1, size=self.NUM_DICE).astype(np.int8)
                      self.last_keep_mask = np.zeros(self.NUM_DICE, dtype=bool)
              else:
                  # この状況は基本発生しないが、念のため
                  terminated = True
          ### <<< 変更ここまで

      # 2. スコアリングフェーズの行動 (役選択)
      else:
          category = action - 32

          # 既に埋まっている役を選んだ場合（無効な行動）
          if self.score_card_mask[category] == 0:
              reward = -10.0 # より重いペナルティ
              # ### 無効な行動でもゲームが止まらないように、強制的にラウンドを進める
              # self.rounds_played += 1 # この処理を入れるかは設計次第ですが、今回は入れずにエージェントに学習させます
          else:
              # スコアを計算
              score = self._calculate_score(self.dice, category)
              reward = float(score)
              self.scores[category] = score
              self.score_card_mask[category] = 0

              # 上の段のボーナスチェック
              if self.ACE <= category <= self.SIX:
                  self.upper_score_total += score
                  if not self.bonus_achieved and self.upper_score_total >= 63:
                      reward += 35.0
                      self.bonus_achieved = True

              # ラウンド終了処理
              self.rounds_played += 1
              if self.rounds_played == self.NUM_ROUNDS:
                  terminated = True
              else:
                  # 次のラウンドの準備
                  self.rolls_left = 2
                  self.dice = self.np_random.integers(1, self.NUM_SIDES + 1, size=self.NUM_DICE).astype(np.int8)
                  self.last_keep_mask = np.zeros(self.NUM_DICE, dtype=bool)

      obs = self._get_obs()
      return obs, reward, terminated, False, {}

    def render(self, mode='human'):
        """現在の状態をコンソールに表示する"""
        print("-" * 30)
        print(f"ラウンド: {self.rounds_played + 1}/{self.NUM_ROUNDS}")
        print(f"残りロール回数: {self.rolls_left}")

        dice_str_parts = []
        # rolls_leftが2（ラウンド開始直後）は、まだ何もキープしていないのでロック表示しない
        should_show_lock = self.rolls_left < 2
        for i, die in enumerate(self.dice):
            if should_show_lock and self.last_keep_mask[i]:
                dice_str_parts.append(f"[{die}]") # 🔒ロックされたダイス
            else:
                dice_str_parts.append(f" {die} ") # 通常のダイス
        dice_display = " ".join(dice_str_parts)
        print(f"現在のサイコロ: {dice_display}")

        print("-" * 30)
        print("スコアシート:")

        upper_total = 0
        for i in range(6):
            if self.score_card_mask[i] == 0:
                print(f"  {self.CATEGORY_NAMES[i]:<12}: {self.scores[i]} 点")
                upper_total += self.scores[i]
            else:
                print(f"  {self.CATEGORY_NAMES[i]:<12}: (空き)")

        print(f"  上の段 合計: {upper_total} / 63")
        if upper_total >= 63:
            print("  ボーナス: 35 点")

        print("-" * 15)
        for i in range(6, 12):
            if self.score_card_mask[i] == 0:
                print(f"  {self.CATEGORY_NAMES[i]:<12}: {self.scores[i]} 点")
            else:
                print(f"  {self.CATEGORY_NAMES[i]:<12}: (空き)")

        print("-" * 30)
        total_score = np.sum(self.scores) + (35 if upper_total >= 63 else 0)
        print(f"現在の合計点: {total_score}")
        print("-" * 30)


# --- 実行とテスト ---
if __name__ == '__main__':
    # 環境がGymnasiumの仕様に準拠しているかチェック
    from stable_baselines3.common.env_checker import check_env

    print("--- 環境のチェックを開始 ---")
    env = YachtEnv()
    check_env(env)
    print("--- 環境のチェックが完了しました ---")

    # ランダムな行動で1ゲームプレイしてみるテスト
    print("\n--- ランダムプレイテスト ---")
    obs, info = env.reset()
    env.render()

    terminated = False
    total_reward = 0

    while not terminated:
        # 現在の状況で有効な行動をサンプリング
        if env.rolls_left > 0:
            # ロールフェーズ：キープ(0-31) or スコア(32-43)のどちらか
            action = env.action_space.sample()
        else:
            # スコアリングフェーズ：必ず役を選ぶ
            # 有効な役の中からランダムに選ぶ
            available_scores = np.where(obs["score_card_mask"] == 1)[0]
            action = env.np_random.choice(available_scores) + 32

        obs, reward, terminated, truncated, info = env.step(action)

        total_reward += reward

        print(f"\n実行した行動: {action}")
        env.render()
        print(f"このステップの報酬: {reward}")

    print(f"\nゲーム終了！最終スコア: {total_reward}")

--- 環境のチェックを開始 ---
--- 環境のチェックが完了しました ---

--- ランダムプレイテスト ---
------------------------------
ラウンド: 1/12
残りロール回数: 2
現在のサイコロ:  4   5   3   3   6 
------------------------------
スコアシート:
  エース         : (空き)
  デュース        : (空き)
  トレイズ        : (空き)
  フォアーズ       : (空き)
  ファイブズ       : (空き)
  シックスズ       : (空き)
  上の段 合計: 0 / 63
---------------
  チョイス        : (空き)
  4カード        : (空き)
  フルハウス       : (空き)
  S.ストレート     : (空き)
  B.ストレート     : (空き)
  ヨット         : (空き)
------------------------------
現在の合計点: 0
------------------------------

実行した行動: 36
------------------------------
ラウンド: 2/12
残りロール回数: 2
現在のサイコロ:  5   6   3   5   6 
------------------------------
スコアシート:
  エース         : (空き)
  デュース        : (空き)
  トレイズ        : (空き)
  フォアーズ       : (空き)
  ファイブズ       : 5 点
  シックスズ       : (空き)
  上の段 合計: 5 / 63
---------------
  チョイス        : (空き)
  4カード        : (空き)
  フルハウス       : (空き)
  S.ストレート     : (空き)
  B.ストレート     : (空き)
  ヨット         : (空き)
------------------------------
現在の合計点: 5
--

Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.
  return datetime.utcnow().replace(tzinfo=utc)


# 3. 学習

In [8]:
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
import time

In [None]:
# --- 1. エージェントの作成 ---

# ベクトル化された環境を作成 (学習を効率化するため)
# 今回は1つの環境で学習させます
vec_env = make_vec_env(YachtEnv, n_envs=1)

# PPOモデル（エージェント）を作成します
# MlpPolicyは、Dict（辞書）形式の状態空間に対応した標準的なポリシーです
# verbose=1にすると学習の進捗が表示されます
model = PPO(
    "MultiInputPolicy",
    vec_env,
    verbose=1,
    tensorboard_log= workspace + "yacht_tensorboard/" # TensorBoard用のログ出力先
)


# --- 2. 学習の実行 ---
print("--- 学習を開始します ---")
# total_timestepsは学習ステップ数です。まずは10万〜20万ステップで試してみましょう。
# ColabのGPUを使えば数分で完了します。
TRAINING_STEPS = 200_000
start_time = time.time()

model.learn(total_timesteps=TRAINING_STEPS)

end_time = time.time()
print(f"--- 学習が完了しました (所要時間: {end_time - start_time:.2f}秒) ---")

# 学習したモデルを保存します（Google Driveへの保存を推奨）
model.save(workspace + "ppo_yacht_agent")
print("モデルを 'ppo_yacht_agent.zip' に保存しました。")


# --- 3. 学習済みエージェントの評価 ---
print("\n--- 学習済みエージェントの評価を開始します ---")

# 評価用に新しい環境を作成
eval_env = YachtEnv()
obs, info = eval_env.reset()

terminated = False
total_reward = 0
game_round = 0

while not terminated:
    game_round += 1
    # model.predictは、現在の状態(obs)から最適な行動(action)を予測します
    # deterministic=Trueにすると、最も確率の高い行動を選択します
    action, _states = model.predict(obs, deterministic=True)

    obs, reward, terminated, truncated, info = eval_env.step(int(action))

    total_reward += reward

    print(f"\n===== [評価] ラウンド {game_round} =====")
    if int(action) < 32:
        keep_mask = np.array([int(b) for b in format(int(action), '05b')])
        print(f"エージェントの行動: {int(action)} (キープ選択 {keep_mask})")
    else:
        category_name = YachtEnv.CATEGORY_NAMES[int(action) - 32]
        print(f"エージェントの行動: {int(action)} ({category_name} にスコア記録)")

    eval_env.render()
    print(f"このステップの報酬: {reward}")

print(f"\n🏆 評価ゲーム終了！ エージェントの最終スコア: {total_reward} 🏆")

Using cuda device
--- 学習を開始します ---


  return datetime.utcnow().replace(tzinfo=utc)


[1;30;43mストリーミング出力は最後の 5000 行に切り捨てられました。[0m
  ヨット         : 0 点
------------------------------
現在の合計点: 31
------------------------------
このステップの報酬: -10.0

===== [評価] ラウンド 84242 =====
エージェントの行動: 37 (シックスズ にスコア記録)
------------------------------
ラウンド: 12/12
残りロール回数: 0
現在のサイコロ: [4] [5]  5   6  [2]
------------------------------
スコアシート:
  エース         : (空き)
  デュース        : 2 点
  トレイズ        : 0 点
  フォアーズ       : 8 点
  ファイブズ       : 0 点
  シックスズ       : 0 点
  上の段 合計: 10 / 63
---------------
  チョイス        : 21 点
  4カード        : 0 点
  フルハウス       : 0 点
  S.ストレート     : 0 点
  B.ストレート     : 0 点
  ヨット         : 0 点
------------------------------
現在の合計点: 31
------------------------------
このステップの報酬: -10.0

===== [評価] ラウンド 84243 =====
エージェントの行動: 37 (シックスズ にスコア記録)
------------------------------
ラウンド: 12/12
残りロール回数: 0
現在のサイコロ: [4] [5]  5   6  [2]
------------------------------
スコアシート:
  エース         : (空き)
  デュース        : 2 点
  トレイズ        : 0 点
  フォアーズ       : 8 点
  ファイブズ       : 0 点
  シックスズ       : 0