In [1]:
from fourInRowGame import Chip, FourInRowGame
import models
import numpy as np
import torch
import copy
import sys
import tqdm
import torch.nn as nn
import os

In [2]:
# Create a game
nrow = 6
ncol = 7
num_states = [nrow, ncol]
env = FourInRowGame(nrow, ncol)

In [3]:
def exploration_policy(eps):
    act = np.random.choice(['model','random'], p = [1 - eps, eps])
    return act


def get_action(policy_net, state, eps):
    #Get the action based on greedy epsilon policy
    act = exploration_policy(eps)
    #Get predictions
    preds = policy_net(state)
    weights = preds.clone().numpy(force=True)
    if act == 'model':
        action = np.argmax(weights)
    elif act == 'random':
        action = np.random.randint(0, 7)
    else:
        raise ValueError(f"act is {act}. Manno")
    return int(action), weights


def check_valid(action, weights, env):
    cont = True
    while cont:    
        if env.column_height(action) >= 6:
            if np.isfinite(weights).sum() != 0:
                weights[action] = np.NINF
                action = np.argmax(weights)
            else:
                raise ValueError(f"Someone managed to violate the rules. I can not move and therefore the game has already ended!")
        else:
            cont = False    
    return int(action)


def get_reward(winner, terminated):
    if not terminated:
        reward = 0.1
    else:
        if winner == 1:  # Model won
            reward = 50.
        elif winner == -1:  # Model lost
            reward = -100.
        else:  # Model drew
            reward = 10.
    return reward


def make_step(env, action, chip1, chip2, policy_net, mode="self"):
    terminated = False
    # Model is player 1 and makes a step
    env.drop(chip1, action)
    if env.check_for_victory():
        winner = 1
        terminated = True
    else:
        not_full_cols = 0
        for col in range(6):
           if env.column_height(col) <= 5:
               not_full_cols += 1
        if not_full_cols == 0:
            terminated = True
        winner = 0
    if not terminated:
        if "trans" in mode: 
            mode = np.random.choice(['self','rng'], p = [1 - trans_eps, trans_eps])
        if "self" in mode:
            with torch.no_grad():
                state = torch.tensor(env.get_simple_slots(), dtype=torch.float)
                preds = policy_net(state)
                weights = preds.clone().numpy(force=True)
                action = int(np.argmax(weights))
                action = check_valid(action, weights, env)
                env.drop(chip2, action)
        elif "rng" in mode:
            # Opponent is a random player and makes step
            pos_acts = []
            for action in range(0,6):
                if env.column_height(action) <= 5:
                    pos_acts.append(action)
            pos_acts = np.array(pos_acts)
            if pos_acts.size != 0:
                action2 = np.random.choice(pos_acts)
                env.drop(chip2, action2)
            else:
                raise ValueError("Choices Empty, your draw check sucks.")
        if env.check_for_victory():
            winner = -1
            terminated = True
        else:
            no_full_cols = 0
            for col in range(6):
               if env.column_height(col) <= 5:
                   no_full_cols += 1
            if no_full_cols == 0:
                terminated = True
            winner = 0
    return env, winner, terminated


def optimize_model(optimizer, states, acts, rewards, gamma, policy_net, target_net):
    size = len(states)
    poli_preds = policy_net(states)
    if poli_preds.ndim >= 3:
        poli_preds = poli_preds.squeeze()
    state_action_values = poli_preds.gather(1, acts.type(torch.int64))
    next_state_values = torch.zeros(size)
    with torch.no_grad():
        tar_preds = target_net(states[1:])
        if tar_preds.ndim >= 3:
            tar_preds = tar_preds.squeeze()
        next_state_values[:-1] = tar_preds.max(1).values
    expected_state_action_values = (next_state_values * gamma) + rewards
    criterion = nn.SmoothL1Loss()
    # print(f"state_action_values is {state_action_values}\n")
    # print(f"expected_state_action_values is {expected_state_action_values}\n")
    # print(f"in unsqueezed: {expected_state_action_values.unsqueeze(1)}")
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()
    return loss

In [4]:
def main(env, optimizer, policy_net, target_net, learn_rate, eps, eps_rate, eps_min, mode, tau, gamma, chip_model, chip_opponent, path="model_name"):
    counter = 0
    encounters = 100000
    pbar = tqdm.tqdm(range(encounters))
    for episode in pbar:
        # Reset the environment to an empty board
        env.reset()
        states = torch.ones((1,6,7))
        acts = torch.ones((1,1))
        rewards = torch.ones((1))
        eps = eps * eps_rate
        if eps <= eps_min:
            eps = eps_min
        terminated = False
        model_is_first = np.random.randint(2)
        if model_is_first == 0:
            action = np.random.randint(0, 7)
            env.drop(chip_opponent, action)
        while not terminated:
            states = torch.cat((states,(torch.tensor(env.get_simple_slots(), dtype=torch.float)).unsqueeze(0)))
            action, weights = get_action(policy_net, states[-1], eps)
            action = check_valid(action, weights, env)
            env, winner, terminated = make_step(env, action, chip_model, chip_opponent, policy_net, mode)
            reward = get_reward(winner, terminated)
            # action_ohe = torch.zeros((1,1))
            # action_ohe[:,action] = 1
            acts = torch.cat((acts, torch.tensor([[action]])))
            rewards = torch.cat((rewards, torch.tensor([reward])))
            if winner == 1:
                counter += 1
        loss = optimize_model(optimizer, states[1:], acts[1:], rewards[1:], gamma, policy_net, target_net)
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*tau + target_net_state_dict[key]*(1-tau)
        target_net.load_state_dict(target_net_state_dict)
        pbar.set_description(f"Loss: {loss}")
    torch.save(policy_net.state_dict(), path)
    print(f"The model won {counter/encounters}% of its encounters")
    return policy_net

In [5]:
gamma = 0.99
eps = 1
eps_rate = 0.995
eps_min = 0.1
learn_rate = 0.001
tau = 0.005
mode = "self"
chip_model = Chip.RED
chip_opponent = Chip.YELLOW
model_name = "FCmedium_1402_diff_ret"
policy_net = models.FCmedium(env.columns*env.rows, env.columns)
target_net = models.FCmedium(env.columns*env.rows, env.columns)
target_net.load_state_dict(policy_net.state_dict())
optimizer = torch.optim.Adam(policy_net.parameters(), lr=learn_rate)
#for i in range(11):
#    path = f"{model_name}/Iteration_{i}"
#    if not os.path.exists(model_name):
#        os.makedirs(model_name)
#    path_pre = f"{model_name}/Iteration_{i-1}"
#    if os.path.exists(path_pre):
#        target_net = models.CNNmedium(1, env.columns)
#        target_net.load_state_dict(torch.load(path_pre))
target_net = main(env, 
                      optimizer, 
                      policy_net, 
                      target_net, 
                      learn_rate, 
                      eps, 
                      eps_rate, 
                      eps_min, 
                      mode, 
                      tau, 
                      gamma, 
                      chip_model, 
                      chip_opponent, 
                      path = model_name)

Loss: 547243828445184.0: 100%|█████████████████████████████████████████████████| 100000/100000 [49:17<00:00, 33.81it/s]

The model won 0.53051% of its encounters



