<a href="https://www.kaggle.com/code/jollychappies/reinforcement-learning-with-connect-4-dqn?scriptVersionId=210886670" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import kagglehub

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

import torch
import torch.nn as nn
import torch.optim as optim

!pip install kaggle-environments

from kaggle_environments import make
env = make("connectx", debug=True)
env.reset();

# State representation

In [None]:
def get_state(env):
    '''
    Given an environment, return the board state.
    '''
    return env.state[0]['observation']['board']

# Q-network

In [None]:
class NeuralNetwork(nn.Module):
    '''
    Neural network for approximating the Bellman equation.
    '''
    def __init__(self):
        super().__init__()
        
        # Fully connected layers
        self.fc1 = nn.Linear(42, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, 7)
        
        # Non-linear activation
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.2)  # Dropout to prevent overfitting
        
    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.dropout(self.relu(self.fc2(x)))
        x = self.relu(self.fc3(x))
        x = self.fc4(x)  # No activation on the output layer
        return x

# Replay buffer

In [None]:
from collections import deque
from random import sample

class ReplayBuffer():
    '''
    A class to store experiences: state, action, reward, next_state.
    '''
    def __init__(self, capacity):
        
        # intialise replay buffer
        self.buffer = deque()
        self.capacity = capacity
        
    def __len__(self):
        return len(self.buffer)
        
    def append(self, new_experience):
        '''Append new experience to buffer.'''
        
        self.buffer.append(new_experience)
        
        if len(self.buffer) > self.capacity: # if buffer reaches maximum size
            self.buffer.popleft() # pop the first element
        
    def sample(self, batch_size):
        '''Sample a batch of experiences.'''
        
        batch = sample(self.buffer, batch_size)
        
        return batch

# Agents

In [None]:
# This agent randomly chooses a non-empty column.
def random_agent(observation, configuration):
    from random import choice
    return choice([c for c in range(configuration.columns) if observation.board[c] == 0])

In [None]:
import random

# This agent chooses an action using the neural network
def my_agent(observation, configuration):
    
    # current state
    state = observation['board']
    
    # get legal moves
    legal_moves = [0 if observation["board"][column] == 0 else -100000 for column in range(configuration.columns)]
    
    # NN prediction on current state
    y = model.forward(torch.tensor(np.array(state, dtype='float32')))

    # apply legal move masking
    legal_moves_tensor = torch.tensor(legal_moves)
    legal_y = y + legal_moves_tensor

    # find highest q-value
    best_move = torch.argmax(legal_y)
    q_value = y[best_move]

    return best_move.item()

In [None]:
def agent_play(matches, rb, epsilon):
    """
    
    Function to get the agent playing in the environment for a fixed number of matches.
    Experiences are added to the replay buffer. Intended for model training. 
    
    Parameters:
    
    matches - number of matches to play
    rb - replay buffer to store experiences to
    
    """
    trainer = env.train([None, "random"])

    trainer.reset();
    
    results = [] # save results
    
    for i in range(matches):
                
        while not env.done:
            
            # get observation
            observation = env.state[0]['observation']

            # choose an action
            if random.uniform(0, 1) < epsilon: # explore
                action = random.choice([c for c in range(env.configuration.columns) if observation.board[c] == 0])
            else:
                action = my_agent(observation, env.configuration)

            # take action and get results
            new_observation, reward, done, info = trainer.step(action)

            # capture experience
            experience = (observation['board'], action, reward, new_observation['board'], observation['mark'])

            # append experience to replay buffer
            rb.append(experience)
        
        if i == range(matches)[-1]: # last matches ...
#             env.render(mode='ipython') # show result
            print("Match {} complete".format(i))
        
        results.append(reward) # append results
        trainer.reset() # restart the game
    
    return results

# Model training

In [None]:
from torch.optim import Adam
# from torch.optim.lr_scheduler import ExponentialLR

gamma = 0.8 # discount factor

def train_model(model, optimizer, scheduler, rb, batch_size):
    """
    
    Function to train the model. 
    Parameters:
    
    model - PyTorch neural network
    rb - replay buffer
    sample_size - the number of experiences to train the model with
    
    """
    
    # get sample of experiences
    batch = rb.sample(batch_size=batch_size)

    states = torch.tensor([experience[0] for experience in batch], dtype=torch.float32)
    actions = torch.tensor([experience[1] for experience in batch])
    rewards = torch.tensor([experience[2] for experience in batch])
    next_states = torch.tensor([experience[3] for experience in batch], dtype=torch.float32)
    
    # generate more complex rewards
    rewards_extra = get_rewards(rewards)
    
    # NN predictions on current states
    y = model.forward(states)
    q_values = y.gather(1, actions.unsqueeze(1)).squeeze()

    # NN prediction on next state
    y_next_state = model.forward(next_states)
    q_values_next_state = y_next_state.max(dim=1)[0].detach()

    # bellman equation
    q_targets = rewards_extra + gamma*q_values_next_state.detach()

    # loss
    loss = ((q_targets - q_values)**2).mean()
    print("Loss:", loss.item())

    optimizer.zero_grad()
    loss.backward() # compute gradients
    optimizer.step() # update weights
    # scheduler.step() # reduce learning rate
    
    return model, loss.item()

In [None]:
def get_rewards(rewards):
    """
    
    Function to get more sophisticated rewards from the default ones.
    Parameters:
    
    rewards - tensor of the existing rewards
    
    """
    new_rewards = []
    rewards = [reward.item() for reward in rewards]
    
    for reward in rewards:
        
        if reward == 1: # agent wins
            new_reward = 2
        elif reward == 0: # agent makes a legal move
            new_reward = 1/42
        else: # agent loses the match
            new_reward = -5
            
        new_rewards.append(new_reward)
    
    return torch.tensor(new_rewards)

In [None]:
def main(model, rb, agent, episodes, matches, batch_size, epsilon, learning_rate):
    """
    Main function to get the agent to play multiple matches of Connect 4 and then train based on it's experiences. 
    Parameters:
    
    model - Neural Network
    rb - Replay Buffer
    agent - the agent to be trained
    episodes - how many training episodes to run
    matches - number of matches to play in each episode
    batch_size - number of experiences to sample from the replay buffer
    epsilon - the starting exploration rate
    learning_rate - the learning rate for the network's optimizer
    
    """
    epsilon_discount = epsilon/episodes # amount to reduce epsilon by every episode
    
    # Initialize optimizer and scheduler
    optimizer = Adam(model.parameters(), lr=learning_rate)
    # scheduler = ExponentialLR(optimizer, gamma=0.95)  # Decay LR by 5% every episode
    
    metric_wr = {}
    metric_loss = {}
    for i in range(episodes):
            
         # save model - untrained, mid-trained, fully-trained
        if i == 0:
            print("Saving untrained model", model.state_dict())
            torch.save(model.state_dict(), "connect_four_model_untrained_rdm.pth")
            
        elif i == episodes/2:
            print("Saving mid-trained model", model.state_dict())
            torch.save(model.state_dict(), "connect_four_model_mid_trained_rdm.pth")
            
        elif i == (episodes-1):
            print("Saving fully trained model", model.state_dict())
            torch.save(model.state_dict(), "connect_four_model_fully_trained_rdm.pth")
        
        # play
        results = agent_play(matches=matches, rb=rb, epsilon=epsilon)

        # learn
        model, loss = train_model(model, optimizer=optimizer, scheduler=None, rb=rb, batch_size=batch_size)
        
        # metrics
        wr = round(results.count(1) / len(results), 2)*100
        metric_wr["Episode {} WR".format(i)] = wr
        metric_loss["Loss {} WR".format(i)] = loss
        print("Episode {} WR".format(i), wr)
        
        # reduce gamma for the next episode - increase exploitation
        epsilon = epsilon - epsilon_discount
        
    return metric_wr, metric_loss

In [None]:
def get_model(new=False):
    """
    
    Function to get the neural network model. 
    It can create one from scratch or retrieve the latest pre-trained version.
    
    Parameters:
    
    new - True if new model requested, False otherwise.
    
    """
    if new==True:
        model = NeuralNetwork()
    else:
        model = NeuralNetwork()
        model.load_state_dict(torch.load("/kaggle/input/derek/pytorch/random-trained/1/connect_four_model_fully_trained.pth"))
        model.eval()
        
    return model

In [None]:
derek_pytorch_random_trained_1_path = kagglehub.model_download('jollychappies/derek/PyTorch/random-trained/1')

In [None]:
model = get_model(new=True)
rb = ReplayBuffer(capacity = 10000)

In [None]:
metric_wr, metric_loss = main(model, rb, agent=my_agent, episodes=3000, matches=20, batch_size=32, epsilon=1, learning_rate=0.001)

In [None]:
import matplotlib.pyplot as plt

def evaluate(metrics):
    """
    Function to visualise learning. 
    
    Parameters:
    
    metrics - the win rates from training episodes.
    
    """
    
    x = np.array(list(metrics.keys()))
    y = np.array(list(metrics.values()))
    rolling_avg = pd.DataFrame(y, x, columns = ['win_rate']).rolling(window=10, center=True).mean()
    
    figure, axes = plt.subplots(figsize=(10, 7))
    
    
    
    axes.plot(rolling_avg)
    axes.set_ylabel("Win rate %")
    axes.set_xlabel("Training episode")
    axes.tick_params(axis='x', rotation=70)
        
    axes.xaxis.set_major_locator(plt.MaxNLocator(len(x)/50))
    
    return figure

In [None]:
figure = evaluate(metric_wr)
figure.savefig("training.png")