# Actor-criticによるポーカーの強化学習

### import

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
from tqdm import tqdm
import math
from env.game import Game

### lossの記録

In [None]:
import wandb

# プロジェクトの初期化
wandb.init(
    project="actor_critic",  # プロジェクト名
    name="正規化",   # 実験名（オプション）
    config={               # ハイパーパラメータなど（オプション）
        "pi_learning_rate": 0.0002,
        "v_learning_rate": 0.0005,
        "action_size":6,
        "gammma":0.1
    }
)

### 各関数の設定

#### 状態価値関数(V)

In [5]:
class ValueNet(nn.Module):
    def __init__(self,state_size):
        super().__init__()
        self.l1 = nn.Linear(state_size, 100)
        self.l2 = nn.Linear(100, 1)

    def forward(self, x):
        x = F.relu(self.l1(x))
        x = self.l2(x)
        return x

#### 方策(π)

In [6]:
class PolicyNet(nn.Module):
    def __init__(self, action_size,state_size):
        super().__init__()
        self.l1 = nn.Linear(state_size, 100)
        self.l2 = nn.Linear(100, action_size)

    def forward(self, x):
        x = F.relu(self.l1(x))
        x = F.softmax(self.l2(x), dim=1)
        return x

#### Agent

In [7]:
class Agent:
    def __init__(self):
        self.gamma = 0.8
        self.lr_pi = 0.0002
        self.lr_v = 0.0005
        self.action_size = 6
        # actionは6つ
        # fold, check, call, raise_2, raise_3, raise_5  
        self.phases = ["preflop", "flop", "turn", "river","show down"]
        self.state_sizes = {
            "preflop": 17,
            "flop": 17,
            "turn": 17,
            "river": 17,
            "show down": 17
        }
        self.pis = {}
        self.vs = {}
        self.optimizer_pis = {}
        self.optimizer_vs = {}
        self.loss_v_lists = {}
        self.loss_pi_lists = {}
        for phase in self.phases:
            self.pis[phase] = PolicyNet(self.action_size ,self.state_sizes[phase])
            self.vs[phase] = ValueNet(self.state_sizes[phase])
            self.optimizer_pis[phase] = optim.Adam(self.pis[phase].parameters(), lr=self.lr_pi)
            self.optimizer_vs[phase] = optim.Adam(self.vs[phase].parameters(), lr=self.lr_v)
            self.loss_v_lists[phase] = []
            self.loss_pi_lists[phase] = []

    def get_action(self, state,mask,phase_index):
        state = torch.tensor(state[np.newaxis, :], dtype=torch.float32)
        mask = torch.tensor(mask,dtype=torch.float32)

        probs = self.pis[self.phases[phase_index]](state)
        probs = probs[0]
        probs = probs * mask
        m = Categorical(probs)
        action = m.sample().item()
        return action, probs[action]

    def update(self, state, action_prob, reward, next_state, player_done,current_phase_idx,next_phase_idx):
        state = torch.tensor(state[np.newaxis, :], dtype=torch.float32)

        current_phase = self.phases[current_phase_idx]
        next_phase = self.phases[next_phase_idx]

        if player_done:
            target = torch.tensor(reward, dtype=torch.float32)
        else:
            next_state = torch.tensor(next_state[np.newaxis, :], dtype=torch.float32)
            target = reward + self.gamma * self.vs[next_phase](next_state)

        target = target.detach()
        v = self.vs[current_phase](state)
        loss_fn = nn.MSELoss()
        loss_v = loss_fn(v, target)
        # wandb.log({f"loss_v:{current_phase}": loss_v.item()})

        delta = target - v
        loss_pi = -torch.log(action_prob) * delta.item()
        loss_pi = loss_pi.float()
        # wandb.log({f"loss_pi:{current_phase}": loss_pi.item()})

        self.loss_v_lists[current_phase].append(loss_v)
        self.loss_pi_lists[current_phase].append(loss_pi)

        self.optimizer_vs[current_phase].zero_grad()
        self.optimizer_pis[current_phase].zero_grad()
        loss_v.backward()
        loss_pi.backward()
        self.optimizer_vs[current_phase].step()
        self.optimizer_pis[current_phase].step()

    def copy_from(self, other_agent):
        for phase in self.phases:
            self.pis[phase].load_state_dict(other_agent.pis[phase].state_dict())
            self.vs[phase].load_state_dict(other_agent.vs[phase].state_dict())

            self.optimizer_pis[phase].load_state_dict(other_agent.optimizer_pis[phase].state_dict())
            self.optimizer_vs[phase].load_state_dict(other_agent.optimizer_vs[phase].state_dict())

In [8]:
def agent_action_from_a(game,agent_a,agent_b,state_a,train_mode=True):
    """
    returns:
        end_flag:roundが終了したかどうかを示す
        next_state:roundが終了しているとNoneになる
        reward_a:agent_aのreward
    """
    mask_a = game.one_round.mask(game.one_round.current_index)
    player0_win_flag = game.one_round.winner_index_truth == 0
    action_a_index, prob_a = agent_a.get_action(state_a,mask_a,game.one_round.current_phase)

    index_to_action = {0:"f",1:"check",2:"call",3:"r_2",4:"r_3",5:"r_5"}
    action_a = index_to_action[action_a_index]

    # agent_b_stack_before = game.players[1].stack
    # if game.one_round.current_phase == 0 and game.one_round.players[0].last_player_act == None:
    #     agent_b_first_bet = game.players[1].first_bet_amount
    # else:
    #     agent_b_first_bet = 0

    reward_a, state_b,current_phase,next_phase_sb = game.step(action_a)

    # プレイヤー0のアクション
    if action_a == "f" or game.one_round.current_phase == 4:
        # next_stateがないので、更新はrewardのみでされる（next_phaseとかは適当です）
        
        if train_mode:
            # print(player0_win_flag,action_a,reward_a)
            # reward_a = process_value(reward_a)
            agent_a.update(state_a,prob_a,reward_a,None,True,current_phase,current_phase)
        end_flag = True
        return end_flag,None,reward_a

    mask_b = game.one_round.mask(game.one_round.current_index)
    action_b_index, prob_b = agent_b.get_action(state_b,mask_b,game.one_round.current_phase)

    action_b = index_to_action[action_b_index]
    reward_b, next_state_a, current_phase_sb, next_phase = game.step(action_b)

    # if game.one_round.winner_index_truth == 0:
    #     agent_b_stack_after = game.players[1].stack
    #     reward_a = (agent_b_stack_before - agent_b_stack_after + agent_b_first_bet)/1000

    # if game.one_round.winner_index_truth == 0:
    #     reward_a += agent_b_first_bet/1000

    # プレイヤー1のアクション
    if action_b == "f" or game.one_round.current_phase == 4:
        # next_stateがないので、更新はrewardのみでされる（next_phaseとかは適当です
        if train_mode:
            # print(player0_win_flag,action_a,reward_a)
            # reward_a = process_value(reward_a)
            agent_a.update(state_a,prob_a,reward_a,None,True,current_phase,current_phase)
        end_flag = True
        return end_flag,None,reward_a

    if train_mode:
        # print(player0_win_flag,action_a,reward_a)
        # reward_a = process_value(reward_a)
        agent_a.update(state_a,prob_a,reward_a,next_state_a,False,current_phase,next_phase)
    state_a = next_state_a
    end_flag = False

    return end_flag,state_a,reward_a

In [9]:
def agent_action_b(state_b,agent_b,game,train_mode=False):
    mask_b = game.one_round.mask(game.one_round.current_index)
    action_b_index, prob_b = agent_b.get_action(state_b,mask_b,game.one_round.current_phase)

    index_to_action = {0:"f",1:"check",2:"call",3:"r_2",4:"r_3",5:"r_5"}
    action_b = index_to_action[action_b_index]
    reward_b, state_a,current_phase,next_phase = game.step(action_b)

    if action_b == "f":
        end_flag = True
        return end_flag,None
    elif game.one_round.current_phase == 4:
        end_flag = True
        return end_flag,None
    else:
        end_flag = False
        return end_flag,state_a

## 学習start!!!!!

In [None]:
# player0がagent_a
# player1がagent_b

agent_a = Agent()
agent_b = Agent()
episodes = 10000

for episode in tqdm(range(episodes)):
    # print("episode:",episode,"が始まりました👏")
    
    # ゲームの設定
    # プレイヤーは二人
    game = Game(2,100000,100,6,False)
    # ゲームの初期条件を入手
    game.game_flag = False

    while not game.game_flag:
        end_flag = False
        state_a = game.one_round.player_state(0)
        state_b = game.one_round.player_state(1)
        # 各ラウンド
        if game.one_round.current_index == 1:
            end_flag, state_a = agent_action_b(state_b,agent_b,game)
        while not end_flag:
            end_flag, state_a,reward_a= agent_action_from_a(game,agent_a,agent_b,state_a)

    if episode % 50 == 0:
        agent_b.copy_from(agent_a)

#### 学習したモデルの挙動を確認する

In [None]:
# for episode in range(episodes):
episodes =  100
for episode in range(episodes):
    # print("episode:",episode,"が始まりました👏")
    # ゲームの設定
    # プレイヤーは二人
    game = Game(2,100000,100,10,True)
    # ゲームの初期条件を入手
    game.game_flag = False

    while not game.game_flag:
        end_flag = False
        state_a = game.one_round.player_state(0)
        state_b = game.one_round.player_state(1)
        # 各ラウンド
        if game.one_round.current_index == 1:
            end_flag, state_a = agent_action_b(state_b,agent_b,game,False)
        while not end_flag:
            end_flag, state_a,reward_a= agent_action_from_a(game,agent_a,agent_b,state_a,False)

#### 学習したモデルを保存する

"train/models/actor_critic/"に保存される

In [12]:
import os
from datetime import datetime
# 現在の日付と時刻を取得
now = datetime.now()
now = str(now.year) + str(now.month) + str(now.day) + str(now.hour) + str(now.minute) + str(now.second)

save_dir_pi = "./train/models/actor_critic/"+ now +"/pi/pytorch/"
save_dir_v= "./train/models/actor_critic/" +now +"/v/pytorch/"

os.makedirs(save_dir_pi, exist_ok=True)
os.makedirs(save_dir_v, exist_ok=True)

p_preflop_save_path = os.path.join(save_dir_pi, "preflop.pth")
p_flop_save_path = os.path.join(save_dir_pi, "flop.pth")
p_turn_save_path = os.path.join(save_dir_pi, "turn.pth")
p_river_save_path = os.path.join(save_dir_pi, "river.pth")
torch.save(agent_a.pis["preflop"].state_dict(), p_preflop_save_path)
torch.save(agent_a.pis["flop"].state_dict(), p_flop_save_path)
torch.save(agent_a.pis["turn"].state_dict(), p_turn_save_path)
torch.save(agent_a.pis["river"].state_dict(), p_river_save_path)

v_preflop_save_path = os.path.join(save_dir_v, "preflop.pth")
v_flop_save_path = os.path.join(save_dir_v, "flop.pth")
v_turn_save_path = os.path.join(save_dir_v, "turn.pth")
v_river_save_path =os.path.join(save_dir_v, "river.pth")

torch.save(agent_a.vs["preflop"].state_dict(), v_preflop_save_path)
torch.save(agent_a.vs["flop"].state_dict(), v_flop_save_path)
torch.save(agent_a.vs["turn"].state_dict(), v_turn_save_path)
torch.save(agent_a.vs["river"].state_dict(), v_river_save_path)

#### 保存してあるモデルでAgentを設定する

In [None]:
### 保存していたデータを復刻

test_agent_a = Agent()
test_agent_b = Agent()

path = 'train/models/actor_critic/nonnormal/'

phases = ["preflop", "flop", "turn", "river"]
for phase in phases:
    test_agent_a.pis[phase].load_state_dict(torch.load(path + 'pi/pytorch/' + phase +'.pth'))
    test_agent_a.vs[phase].load_state_dict(torch.load(path + 'v/pytorch/' + phase +'.pth'))
    test_agent_b.pis[phase].load_state_dict(torch.load(path + 'pi/pytorch/' + phase +'.pth'))
    test_agent_b.vs[phase].load_state_dict(torch.load(path + 'v/pytorch/' + phase +'.pth'))

#### 復活させたモデルの挙動を確認する

In [None]:
for episode in range(100):
    # print("episode:",episode,"が始まりました👏")
    # ゲームの設定
    # プレイヤーは二人
    game = Game(2,100000,100,3,True)
    # ゲームの初期条件を入手
    game.game_flag = False

    while not game.game_flag:
        end_flag = False
        state_a = game.one_round.player_state(0)
        state_b = game.one_round.player_state(1)
        # 各ラウンド
        if game.one_round.current_index == 0:
            end_flag, state_a = agent_action_b(state_b,test_agent_b,game)
        while not end_flag:
            end_flag, state_a,reward_a= agent_action_from_a(game,test_agent_a,test_agent_b,state_a)

#### pytorchで作成したモデルをonnxに変換する

GPUない環境でもモデルを動かせるようにするため

In [None]:
# onnxファイルに変換する
import os

change_agent = Agent()

path = "train/models/actor_critic/2025112202714/"

# 作成したいディレクトリのパス
output_pi_dir = path + 'pi/onnx/'
output_v_dir = path + 'v/onnx/'

# ディレクトリを作成（存在しない場合のみ作成）
os.makedirs(output_pi_dir, exist_ok=True)
os.makedirs(output_v_dir, exist_ok=True)

phases = ["preflop", "flop", "turn", "river"]
for phase in phases:
    change_agent.pis[phase].load_state_dict(torch.load(path+'pi/pytorch/'+ phase +'.pth'))
    change_agent.vs[phase].load_state_dict(torch.load(path+'v/pytorch/'+ phase +'.pth'))

    change_agent.pis[phase].eval()
    change_agent.vs[phase].eval()

    dummy_input = torch.randn(1, change_agent.state_sizes[phase])

    torch.onnx.export(change_agent.pis[phase], dummy_input, path + 'pi/onnx/'+ phase +'.onnx', export_params=True, opset_version=11)
    torch.onnx.export(change_agent.vs[phase], dummy_input, path + 'v/onnx/'+ phase +'.onnx', export_params=True, opset_version=11)