In [None]:
# %% [code]
import os
import sys
import ast
import re
import numpy as np
import torch
import torch.optim as optim
import urllib3
import matplotlib.pyplot as plt
import random

# Add project root to path so we can import modules from the api and ai folders.
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))

# Disable HTTPS warnings.
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Import helper functions for building representations.
# from api.replay import parse_card, build_replay_experiences
# from experiment import build_action_rep_for_state, to_torch_input

# Import our (dummy) API functions needed for self-play simulation.
# (Note: We will not use NewHand or Act since we simulate self-play.)
from api.playSlumbot import get_street_name, parse_action_enhanced, ChooseActionAI, index_to_action_string, STACK_SIZE, SMALL_BLIND, BIG_BLIND

# Import our model and PPO utilities.
from siamese_net import PseudoSiameseNet, logits_to_probs, clone_model_weights, to_torch_input
from ppo_utils import a_gae, tc_loss_function, ratio, r_gamma, v_loss, make_model_value_function

# Import HandResult grouping functions.
from hand_result import create_hands_from_experiences, build_experiences_from_txt, build_replay_experiences

# Set constants.
NUM_STREETS = 4
STACK_SIZE = 20000
BIG_BLIND = 100
SMALL_BLIND = 50

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


In [None]:
# %% [code]
# Define an AgentEntry class and a global history pool.
INITIAL_ELO = 1500

class AgentEntry:
    def __init__(self, policy_net, elo=INITIAL_ELO):
        self.policy_net = policy_net
        self.elo = elo

history_pool = []

# Elo update helper functions.
def expected_score(rating_A, rating_B):
    return 1 / (1 + 10 ** ((rating_B - rating_A) / 400))

def update_elo(rating, expected, score, k_factor=32):
    return rating + k_factor * (score - expected)

# Dummy simulation of a single hand between two agents.
def simulate_hand(agentA, agentB):
    """
    Simulates a single hand between agentA and agentB.
    Returns a chip differential (positive means agentA wins chips).
    Replace this with your actual hand simulation.
    """
    # For demonstration, return a random chip differential between -3000 and 3000.
    return np.random.randint(-3000, 3001)

# Build a self-play hand that also returns a dummy replay experience.
def self_play_hand(agentA, agentB):
    """
    Simulates one hand between agentA and agentB.
    Returns a tuple: (chip differential, experiences)
    where experiences is a list of replay experience dictionaries produced by build_replay_experiences.
    For demonstration, a dummy replay string is used.
    """
    diff = simulate_hand(agentA, agentB)
    # Dummy replay string (in your real system, this would come from simulating full play).
    replay_str = "b200c/kk/b200f"
    # Dummy hole cards and board.
    hole_cards = ['As', 'Kd']
    board = []  # Preflop (empty board)
    client_pos = 0  # Assume agentA is hero.
    # Build experiences using your build_replay_experiences function.
    experiences, _, _, _ = build_replay_experiences(replay_str, board, hole_cards, client_pos)
    for exp in experiences:
        exp['winnings'] = diff
    return diff, experiences

def self_play_batch(num_hands, agentA, agentB):
    """
    Simulates a batch of hands between agentA and agentB.
    Returns the total chip differential (agentA perspective) and the concatenated replay experiences.
    """
    total_diff = 0
    all_experiences = []
    for _ in range(num_hands):
        diff, exps = self_play_hand(agentA, agentB)
        total_diff += diff
        all_experiences.extend(exps)
    return total_diff, all_experiences


In [None]:
# %% [code]
def play_batch(num_hands, token, policy_net, replay_path="replay.txt"):
    """
    In self-play, we simulate a batch of hands and write dummy replay data.
    This function calls self_play_batch and writes the resulting experiences to replay_path.
    Returns token (unused), total winnings, and average mBB/hand.
    """
    total_winnings = 0
    all_experiences = []
    # Clear previous replay file.
    open(replay_path, "w").close()
    
    for h in range(num_hands):
        # Instead of PlayHand (API), we use our self-play simulation.
        diff, exps = self_play_hand(policy_net, policy_net)  # self-play: agent plays against itself.
        total_winnings += diff
        all_experiences.extend(exps)
        # Write a simplified line to replay file.
        with open(replay_path, "a") as f:
            # Format: hand_index,final_action,board,hole_cards,client_pos,winnings
            # Here we use the dummy replay string from self_play_hand.
            f.write(f"{h+1},dummy_replay,{[]},{['As','Kd']},{0},{diff}\n")
    
    avg_bb = total_winnings / (num_hands * BIG_BLIND)
    print(f"\nBatch DONE. Total winnings: {total_winnings}, mBB/hand: {avg_bb:.3f}")
    return token, total_winnings, avg_bb

def run_one_iteration(iter_idx: int, rounds_array, old_policy_net, new_policy_net, optimizer):
    """
    Executes a single PPO update iteration on the given rounds_array.
    Returns average policy loss, average value loss, and number of steps.
    """
    print(f"\n=== Training Iteration {iter_idx} ===")
    total_pol_loss = 0
    total_val_loss = 0
    steps_count = 0

    model_value_func = make_model_value_function(new_policy_net)
    states, rewards = zip(*[(r["state"], r["reward"]) for r in rounds_array])
    
    for i, round_ in enumerate(rounds_array):
        deltas = round_['deltas']
        action_taken = round_['action_taken']
        state = round_['state']
        
        if i < len(rounds_array) - 1:
            future_rewards = rewards[i:]
            future_states  = states[i:]
            advantage_t = a_gae(future_states, future_rewards, model_value_func, gamma=0.999, lambda_=0.99)
        else:
            future_rewards = [round_["reward"]]
            advantage_t = torch.tensor(0.0)
        
        card_tensor = state[0]
        action_tensor = state[1]
        action_t, card_t = to_torch_input(card_tensor, action_tensor)
        
        old_logits, _ = old_policy_net.forward(action_t, card_t)
        old_probs = logits_to_probs(old_logits)[0].detach().cpu().numpy()

        new_logits, new_value = new_policy_net.forward(action_t, card_t)
        new_probs_t = logits_to_probs(new_logits)[0]
        new_probs = new_probs_t.detach().cpu().numpy()

        ratio_t = ratio(old_probs, new_probs, action_taken)
        pol_loss_tensor = tc_loss_function(ratio_t, advantage_t, epsilon=0.2)
        pol_loss_val = pol_loss_tensor.item()

        r_g = r_gamma(np.array(future_rewards), gamma=0.999)
        val_loss_tensor = v_loss(r_g, state, deltas, new_value)
        val_loss_val = val_loss_tensor.item()

        # For simplicity, combine losses linearly.
        c = 1
        combined_loss = -pol_loss_tensor + c * val_loss_tensor

        optimizer.zero_grad()
        combined_loss.backward()
        optimizer.step()

        total_pol_loss += pol_loss_val
        total_val_loss += val_loss_val
        steps_count += 1
        print(f"  Iter {iter_idx} round {i}: pol_loss={pol_loss_val:.3f}, val_loss={val_loss_val:.3f}")
    
    avg_pol = total_pol_loss / steps_count if steps_count > 0 else 0
    avg_val = total_val_loss / steps_count if steps_count > 0 else 0
    print(f"=> Iteration {iter_idx} done. avg pol_loss={avg_pol:.3f}, avg val_loss={avg_val:.3f}")
    return avg_pol, avg_val, steps_count

def train_model(model, hand_results):
    """
    Trains the model on the provided hand_results.
    Returns the updated model along with overall average policy and value losses.
    """
    old_policy_net = PseudoSiameseNet()
    new_policy_net = PseudoSiameseNet()
    clone_model_weights(model, old_policy_net)
    clone_model_weights(model, new_policy_net)
    optimizer = optim.Adam(new_policy_net.parameters(), lr=0.0001)
    
    total_pol_loss = 0
    total_val_loss = 0
    total_steps = 0
    for i, hand_result in enumerate(hand_results):
        avg_pol, avg_val, steps = run_one_iteration(i, hand_result.rounds, old_policy_net, new_policy_net, optimizer)
        total_pol_loss += avg_pol * steps
        total_val_loss += avg_val * steps
        total_steps += steps
    clone_model_weights(new_policy_net, old_policy_net)
    overall_avg_pol = total_pol_loss / total_steps if total_steps > 0 else 0
    overall_avg_val = total_val_loss / total_steps if total_steps > 0 else 0
    return new_policy_net, overall_avg_pol, overall_avg_val


In [None]:
# %% [code]
# Self-play: use our dummy simulation to generate chip differentials and dummy replay experiences.
def simulate_selfplay_match(num_hands, agentA, agentB):
    return self_play_batch(num_hands, agentA, agentB)

# Function to select an opponent from the top K in the history pool.
def select_opponent(history_pool, K=5):
    sorted_pool = sorted(history_pool, key=lambda entry: entry.elo, reverse=True)
    top_K = sorted_pool[:K]
    return random.choice(top_K).policy_net

from copy import deepcopy

# Initialize current agent and history pool.
current_agent = PseudoSiameseNet()
current_agent.elo = INITIAL_ELO
history_pool.append(AgentEntry(current_agent, elo=INITIAL_ELO))

# Continuous training loop.
num_cycles = 10
match_hands = 100       # number of hands per self-play match (for Elo update)
num_matches_per_cycle = 5  # number of matches per cycle (each match is 100 hands)
hands_per_cycle = 100   # number of hands for generating training replays

# Metrics logging.
avg_bb_list = []         # average mBB/hand from self-play matches
avg_pol_loss_list = []   # average policy loss per cycle
avg_val_loss_list = []   # average value loss per cycle
elo_history = []
cycle_numbers = []

for cycle in range(num_cycles):
    print(f"\n=== Cycle {cycle+1}/{num_cycles} ===")
    
    # 1. Self-play: select an opponent from history.
    opponent_agent = select_opponent(history_pool, K=5)
    
    # Run several self-play matches to determine chip differential.
    match_diffs = []
    for m in range(num_matches_per_cycle):
        diff, _ = simulate_selfplay_match(match_hands, current_agent, opponent_agent)
        match_diffs.append(diff)
        print(f"  Match {m+1}: chip differential = {diff}")
    avg_chip_diff = np.mean(match_diffs)
    mBB_per_hand = avg_chip_diff / (BIG_BLIND * match_hands)
    avg_bb_list.append(mBB_per_hand)
    print(f"Cycle {cycle+1}: Average mBB/hand = {mBB_per_hand:.3f}")
    
    # 2. Determine match result: win if chip diff >= +10000, loss if <= -10000, draw otherwise.
    if avg_chip_diff >= 10000:
        result = 1.0
    elif avg_chip_diff <= -10000:
        result = 0.0
    else:
        result = 0.5
    
    R_current = current_agent.elo
    R_opponent = opponent_agent.elo if hasattr(opponent_agent, "elo") else INITIAL_ELO
    exp_current = expected_score(R_current, R_opponent)
    exp_opponent = expected_score(R_opponent, R_current)
    new_R_current = update_elo(R_current, exp_current, result, k_factor=32)
    new_R_opponent = update_elo(R_opponent, exp_opponent, 1 - result, k_factor=32)
    current_agent.elo = new_R_current
    opponent_agent.elo = new_R_opponent
    elo_history.append(new_R_current)
    print(f"Cycle {cycle+1}: Updated Elo: current = {new_R_current:.1f}, opponent = {new_R_opponent:.1f}")
    
    # 3. Generate training experiences from self-play.
    # Here we simulate a batch of self-play hands (which writes dummy replay data to file).
    token = None  # not used in self-play simulation.
    token, _, _ = play_batch(hands_per_cycle, token, current_agent, replay_path="replay.txt")
    
    # 4. Build experiences and group into hand results.
    experiences = build_experiences_from_txt("replay.txt")
    print(f"Cycle {cycle+1}: Loaded {len(experiences)} experiences.")
    hand_results = create_hands_from_experiences(experiences)
    print(f"Cycle {cycle+1}: Created {len(hand_results)} hand results.")
    
    # 5. Train on these experiences.
    policy_net, avg_pol, avg_val = train_model(current_agent, hand_results)
    avg_pol_loss_list.append(avg_pol)
    avg_val_loss_list.append(avg_val)
    cycle_numbers.append(cycle+1)
    
    current_agent = policy_net  # update current agent.
    
    # 6. Add the new version of the current agent to the history pool.
    history_pool.append(AgentEntry(deepcopy(current_agent), elo=current_agent.elo))
    
    # 7. Clear replay file for next cycle.
    open("replay.txt", "w").close()
    
    print(f"Cycle {cycle+1} complete: mBB/hand = {mBB_per_hand:.3f}, avg policy loss = {avg_pol:.3f}, avg value loss = {avg_val:.3f}")

# Plot metrics.
plt.figure(figsize=(12, 6))

plt.subplot(2,2,1)
plt.plot(cycle_numbers, avg_bb_list, marker='o', label="mBB/hand")
plt.xlabel("Cycle")
plt.ylabel("mBB/hand")
plt.title("Average mBB/hand Over Cycles")
plt.legend()

plt.subplot(2,2,2)
plt.plot(cycle_numbers, avg_pol_loss_list, marker='o', label="Avg Policy Loss")
plt.xlabel("Cycle")
plt.ylabel("Policy Loss")
plt.title("Avg Policy Loss Over Cycles")
plt.legend()

plt.subplot(2,2,3)
plt.plot(cycle_numbers, avg_val_loss_list, marker='o', label="Avg Value Loss")
plt.xlabel("Cycle")
plt.ylabel("Value Loss")
plt.title("Avg Value Loss Over Cycles")
plt.legend()

plt.subplot(2,2,4)
plt.plot(cycle_numbers, elo_history, marker='o', label="Current Agent Elo")
plt.xlabel("Cycle")
plt.ylabel("ELO")
plt.title("Current Agent Elo Over Cycles")
plt.legend()

plt.tight_layout()
plt.show()
