In [50]:
# Needed if running on Colab, comment out if in local environment!
!pip3 install open-spiel
!pip3 install torch


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [51]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from go_search_problem import GoProblem, GoState
from heuristic_go_problems import GoProblemLearnedHeuristic, GoProblemSimpleHeuristic
from agents import GreedyAgent, RandomAgent, AlphaBetaAgent
import matplotlib.pyplot as plt
from tqdm import tqdm
from game_runner import GameRunner
import pickle

torch.set_default_tensor_type(torch.FloatTensor)

In [52]:
def load_dataset(path: str):
    with open(path, 'rb') as f:
        dataset = pickle.load(f)
    return dataset

# We've provided a dataset with pyspiel and without (i.e., pygo)
dataset_5x5 = load_dataset('dataset_5x5.pkl')

In [53]:
def save_model(path: str, model):
    """
    Save model to a file
    Input:
        path: path to save model to
        model: Pytorch model to save
    """
    torch.save({
        'model_state_dict': model.state_dict(),
    }, path)

def load_model(path: str, model):
    """
    Load model from file

    Note: you still need to provide a model (with the same architecture as the saved model))

    Input:
        path: path to load model from
        model: Pytorch model to load
    Output:
        model: Pytorch model loaded from file
    """
    checkpoint = torch.load(path)
    model.load_state_dict(checkpoint['model_state_dict'])
    return model

# Task 1: Convert GameState to Features

In [54]:
def get_features(game_state: GoState):
    """
    Map a game state to a list of features.

    Some useful functions from game_state include:
        game_state.size: size of the board
        get_pieces_coordinates(player_index): get coordinates of all pieces of a player (0 or 1)
        get_pieces_array(player_index): get a 2D array of pieces of a player (0 or 1)
        
        get_board(): get a 2D array of the board with 4 channels (player 0, player 1, empty, and player to move). 4 channels means the array will be of size 4 x n x n
    
        Descriptions of these methods can be found in the GoState

    Input:
        game_state: GoState to encode into a fixed size list of features
    Output:
        features: list of features
    """
    board_size = game_state.size
    
    # get_piecces_arrary(player_index) -> 2D array (n x n) with 1/0
    black = np.array(game_state.get_pieces_array(0), dtype=float)
    white = np.array(game_state.get_pieces_array(1), dtype=float)

    #empty = 1 - (black + white)
    empty = 1.0 - (black + white)
    empty = np.clip(empty, 0.0, 1.0)

    # playet_to_move
    if game_state.player_to_move == 0: # BLACK move
        to_move = np.ones((board_size, board_size), dtype=float)
    else: # WHITE move
        to_move = np.zeros((board_size, board_size), dtype=float)

    # Stack channels in order
    features = np.stack([black, white, empty, to_move], axis=0)

    return features.flatten().tolist()

In [55]:
# Print information about first data point
data_point = dataset_5x5[0]
features = get_features(data_point[0])
action = data_point[1]
result = data_point[2]
print(data_point[0])
print("features", features)
print("Action #", action)
print("Game Result", result)

GoState(komi=0.5, to_play=W, history.size()=23)

 5 +XXX+
 4 OOOXX
 3 +OXX+
 2 OOXX+
 1 +X+X+
   ABCDE

features [0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
Action # 20
Game Result 1.0


# Task 2: Supervised Learning of a Value Network

In [56]:
class ValueNetwork(nn.Module):
    def __init__(self, input_size):
        super(ValueNetwork, self).__init__()
        # Output is a single scalar value (predicted game outcome from BLACK's perspective)
        # Implement a small MLP: input -> 256 -> 128 -> 1
        hidden1 = 256
        hidden2 = 128
        self.net = nn.Sequential(
            nn.Linear(input_size, hidden1),
            nn.ReLU(),
            nn.Linear(hidden1, hidden2),
            nn.ReLU(),
            nn.Linear(hidden2, 1)
        )

    def forward(self, x):
      """
      Run forward pass of network

      Input:
        x: input to network
      Output:
        output of network
      """
      # DONE: Update as more layers are added
      if not isinstance(x, torch.Tensor):
          x = torch.tensor(x, dtype=torch.float32)
      # Add batch dimension when given a single example
      original_was_1d = False
      if x.dim() == 1:
          x = x.unsqueeze(0)
          original_was_1d = True
      out = self.net(x)  # shape (batch, 1)
      out = out.squeeze(-1)  # shape (batch,) or scalar if batch==1
      if original_was_1d:
          # return a scalar tensor (0-d), keep as tensor for consistency with training code
          return out.squeeze(0)
      return out

In [57]:
# This will not produce meaningful outputs until trained, but you can test for syntax errors
features_tensor = torch.Tensor(features)
value_net = ValueNetwork(len(features))
print("predicted Value", value_net(features_tensor))

predicted Value tensor(-0.0498, grad_fn=<SqueezeBackward1>)


In [58]:
def train_value_network(dataset, num_epochs, learning_rate):
    """
    Train a value network on the provided dataset.

    Input:
        dataset: list of (state, action, result) tuples
        num_epochs: number of epochs to train for
        learning_rate: learning rate for gradient descent
    Output:
        model: trained model
    """
    # Make sure dataset is shuffled for better performance
    random.shuffle(dataset)
    # You may find it useful to create train/test sets to better track performance/overfit/underfit
    # DONE: Create model
    
    model = ValueNetwork(input_size=len(get_features(dataset[0][0])))

    # DONE: Specify Loss Function
    loss_function = nn.MSELoss()

    # You can use Adam, which is stochastic gradient descent with ADAptive Momentum
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    batch_size = 32

    for epoch in range(num_epochs):
        batch_loss = 0
        batch_counter = 0
        for data_point in dataset:
            state = data_point[0]
            features = get_features(state)
            features_tensor = torch.tensor(features)
            

            # DONE: What should the desired output of the value network be?
            # Note: You will have to convert the label to a torch tensor to use with torch's loss functions
            label = torch.tensor([data_point[2]], dtype=torch.float32)
            # DONE: Get model prediction of value
            prediction = model(features_tensor).squeeze()

            # DONE: Compute Loss for data point
            loss = loss_function(prediction, label)
            batch_loss += loss
            batch_counter += 1
            if batch_counter % batch_size == 0:
                # Call backward to run backward pass and compute gradients
                batch_loss.backward()

                # Run gradient descent step with optimizer
                optimizer.step()

                # Reset gradient for next batch
                optimizer.zero_grad()
                batch_loss = 0

    return model

value_model = train_value_network(dataset_5x5, 10, 1e-4)
save_model("value_model.pt", value_model)

## Comparing Learned Value function against other Agents

In [59]:
class GoProblemLearnedHeuristic(GoProblem):
    def __init__(self, model=None, state=None):
        super().__init__(state=state)
        self.model = model

    def encoding(self, state):
        """
        Get encoding of state (convert state to features)
        Note, this may call get_features() from Task 1. 

        Input:
            state: GoState to encode into a fixed size list of features
        Output:
            features: list of features
        """
        # DONE: get encoding of state (convert state to features)
        features = get_features(state)
        return features

    def heuristic(self, state, player_index):
        """
        Return heuristic (value) of current state

        Input:
            state: GoState to encode into a fixed size list of features
            player_index: index of player to evaluate heuristic for
        Output:
            value: heuristic (value) of current state
        """
        # DONE: Compute heuristic (value) of current state
        if self.model is None:
            return 0.0
        feats = np.array(self.encoding(state), dtype=np.float32)
        x = torch.tensor(feats, dtype=torch.float32)
        with torch.no_grad():
            self.model.eval()
            if x.dim() == 1:
                x = x.unsqueeze(0)
            pred = self.model(x).squeeze().item()
        value = float(pred)

        # Note: your agent may perform better if you force it not to pass
        # (i.e., don't select action #25 on a 5x5 board unless necessary)
        return value

    def __str__(self) -> str:
        return "Learned Heuristic"

def create_value_agent_from_model():
    """
    Create agent object from saved model. This (or other methods like this) will be how your agents will be created in gradescope and in the final tournament.
    """

    model_path = "value_model.pt"
    # DONE: Update number of features for your own encoding size
    sample_state = dataset_5x5[0][0]
    feature_size = len(get_features(sample_state))
    model = load_model(model_path, ValueNetwork(feature_size))
    heuristic_search_problem = GoProblemLearnedHeuristic(model)

    # DONE: Try with other heuristic agents (IDS/AB/Minimax)
    learned_agent = GreedyAgent(heuristic_search_problem)

    return learned_agent

# Create agents and run a quick tournament (ensure value_model.pt exists and matches feature size)
learned_agent = create_value_agent_from_model()
agent2 = GreedyAgent(GoProblemSimpleHeuristic())
print("Greedy Agent", agent2)
print("Learned Agent", learned_agent)

game_runner = GameRunner()
game_runner.play_tournament(learned_agent, agent2, num_games=100)

  checkpoint = torch.load(path)


Greedy Agent GreedyAgent + Simple Heuristic
Learned Agent GreedyAgent + Learned Heuristic


Playing tournament: 100%|██████████| 50/50 [00:01<00:00, 28.67it/s]

Tournament Results
Games played: 100
GreedyAgent + Learned Heuristic wins: 64 (64.0%)
GreedyAgent + Simple Heuristic wins: 36 (36.0%)
GreedyAgent + Learned Heuristic wins as BLACK: 39
GreedyAgent + Simple Heuristic wins as BLACK: 25
GreedyAgent + Learned Heuristic avg move time: 0.001s
GreedyAgent + Simple Heuristic avg move time: 0.000s
GreedyAgent + Learned Heuristic min time remaining: 26.0s
GreedyAgent + Simple Heuristic min time remaining: 26.0s





TournamentStats(player1_wins=64, player2_wins=36, player1_wins_as_black=39, player2_wins_as_black=25, player1_total_time=np.float64(0.07014208925310815), player2_total_time=np.float64(0.01858210253868624), player1_min_time_remaining=25.98891830444336, player2_min_time_remaining=25.997186183929443, player1_max_move_time=np.float64(0.0051670074462890625), player2_max_move_time=np.float64(0.0010113716125488281), games_played=100)