In [None]:
from re import VERBOSE
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from collections import deque
import random
from tqdm import tqdm
import copy

In [None]:
class Connect4:
    def __init__(self, rew, inv):
        self.rows = 6
        self.cols = 7
        self.board = np.zeros((self.rows, self.cols))
        self.player = 1
        self.winning_length = 4
        self.inv = inv
        self.rew = rew

    def reset(self):
        self.board = np.zeros((self.rows, self.cols))
        self.player = 1
        return self.board.copy()

    def is_valid_move(self, col):
        return self.board[0][col] == 0

    def make_move(self, col):
        for row in range(self.rows - 1, -1, -1):
            if self.board[row][col] == 0:
                self.board[row][col] = self.player
                return row, col

    def check_winner(self, row, col):
        directions = [(0, 1), (1, 0), (1, 1), (1, -1)]
        for dr, dc in directions:
            count = 1
            r, c = row + dr, col + dc
            while 0 <= r < self.rows and 0 <= c < self.cols and self.board[r][c] == self.player:
                count += 1
                r += dr
                c += dc
            r, c = row - dr, col - dc
            while 0 <= r < self.rows and 0 <= c < self.cols and self.board[r][c] == self.player:
                count += 1
                r -= dr
                c -= dc
            if count >= self.winning_length:
                return True
        return False

    def is_board_full(self):
        return np.all(self.board != 0)

    def step(self, action):
        if self.player == 1:
          win = 1
        else:
          win = -1
        if not self.is_valid_move(action):
            return self.board.copy(), -self.inv*win, True
        row, col = self.make_move(action)
        if self.check_winner(row, col):
            return self.board.copy(), self.rew*win, True
        if self.is_board_full():
            return self.board.copy(), 0, True
        self.player = 3 - self.player  # Switch player
        return self.board.copy(), 0, False

    def render(self):
        print(self.board)

    @property
    def observation_space(self):
        return self.board.shape

    @property
    def action_space(self):
        return self.cols


In [None]:
FCNN = models.Sequential([
    layers.Input(shape=(6,7)),
    layers.Flatten(),
    layers.Dense(256, activation='relu'),
    layers.Dense(128, activation='relu'),
    layers.Dense(64, activation='relu'),
    layers.Dense(7)  # Output layer for Q-values
])
FCNN.compile(loss='mean_squared_error', optimizer=optimizers.Adam())

CNN = models.Sequential([
    layers.Reshape((6, 7, 1), input_shape=(6, 7)),
    layers.Conv2D(256, (3,3), activation='relu', padding = 'valid'),
    layers.Conv2D(128, (3,3), activation='relu', padding = 'valid'),
    layers.Flatten(),
    layers.Dense(128, activation = 'relu'),
    layers.Dense(7)  # Output layer for Q-values
])
CNN.compile(loss='mean_squared_error', optimizer=optimizers.Adam())

In [None]:
import copy

class DQNAgent:
    def __init__(self, model):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=1000)  # Replay memory
        self.gamma = 0.95  # Discount factor
        self.epsilon = 0.2  # Exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.999
        self.batch_size = 32
        self.online_network = copy.deepcopy(model)
        self.target_network = copy.deepcopy(model)
        self.update_target_network()

    def valid(self, state):
        return np.array([min(np.abs(state[0][col]), 1) for col in range(7)])

    def change(self, state):
        state[state == 2] = -1
        return state

    def update_target_network(self):
        self.target_network.set_weights(self.online_network.get_weights())

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((self.change(state).reshape(1,6,7), action, reward, self.change(next_state).reshape(1,6,7), done))

    def act(self, state):
        valid = self.valid(state)
        state = self.change(state)
        if np.random.rand() <= self.epsilon:
            return np.random.choice(np.where(valid == 0)[0])
            #return np.random.choice(self.action_size)
        state = state.reshape(1,6,7)
        q_values = self.online_network.predict(state, verbose=0)
        return np.argmax(q_values[0] - valid*10000)

    def replay(self):
        if len(self.memory) < self.batch_size:
            return
        minibatch = random.sample(self.memory, self.batch_size)
        for state, action, reward, next_state, done in minibatch:
            valid = self.valid(state)
            target = self.online_network.predict(state, verbose=0)
            if done:
                target[0][action] = reward
            else:
                target[0][action] = reward + self.gamma * np.amax(self.target_network.predict(next_state, verbose=0)[0])
                target[0] = target[0] - valid
            self.online_network.fit(state, target, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

def WDL_string(WDL):
  return str(WDL[rew]) + ' | ' + str(WDL[0]) + ' | ' + str(WDL[-rew]) + ' | ' + str(WDL[-inv]) + ' | ' + str(WDL[inv])

def transform(state):
  state[state == 0] = 3
  state = 3 - state
  return state

# Initialize the Connect4 environment and the agent
rew = 5
inv = 7
env = Connect4(rew, inv)
state_size = env.observation_space
action_size = env.action_space
agent = DQNAgent(CNN)
WDL = {rew:0, 0:0, -rew:0, -inv: 0, inv: 0}

# Train the agent
for episode in range(1000):
    state = env.reset()
    state = np.expand_dims(state, axis = 0)
    states = []
    actions = []
    for time_step in range(50):
        if time_step%2 == 0:
          action = agent.act(state)
        else:
          action = agent.act(transform(state))
        next_state, reward, done = env.step(action)
        if time_step%2:
          states.append(transform(state.copy()))
        else:
          states.append(state.copy())
        actions.append(action)
        state = next_state
        if done:
            WDL[reward] += 1
            print('Episode:', str(episode+1) + '/1000', 'Reward:', reward, 'steps:', time_step, 'WDL:', '    ', WDL_string(WDL))
            break

    n = len(states)
    for i in range(n-2):
      agent.remember(states[i], actions[i], 0, states[i+2], 0)
    if np.abs(reward) == rew:
      for i in range(5):
        agent.remember(states[n-2], actions[n-2], -rew, states[n-1], 1)
        agent.remember(states[n-1], actions[n-1], rew, states[n-1], 1)
    elif reward == 0:
      agent.remember(states[n-2], actions[n-2], 0, states[n-1], 1)
      agent.remember(states[n-1], actions[n-1], 0, states[n-1], 1)
    else:
      agent.remember(states[n-1], actions[n-1], -inv, states[n-1], 1)
    if episode % 10 == 0:
        agent.update_target_network()

    agent.replay()

# Evaluate the agent
# Implement code to play Connect4 using the trained agent



Episode: 1/1000 Reward: -100 steps: 7 WDL:      0 | 0 | 1 | 0 | 0
Episode: 2/1000 Reward: 100 steps: 10 WDL:      1 | 0 | 1 | 0 | 0
Episode: 3/1000 Reward: 100 steps: 10 WDL:      2 | 0 | 1 | 0 | 0
Episode: 4/1000 Reward: -100 steps: 9 WDL:      2 | 0 | 2 | 0 | 0
Episode: 5/1000 Reward: 1000 steps: 7 WDL:      2 | 0 | 2 | 0 | 1
Episode: 6/1000 Reward: -100 steps: 7 WDL:      2 | 0 | 3 | 0 | 1
Episode: 7/1000 Reward: 1000 steps: 7 WDL:      2 | 0 | 3 | 0 | 2
Episode: 8/1000 Reward: 100 steps: 8 WDL:      3 | 0 | 3 | 0 | 2
Episode: 9/1000 Reward: 1000 steps: 9 WDL:      3 | 0 | 3 | 0 | 3
Episode: 10/1000 Reward: 100 steps: 6 WDL:      4 | 0 | 3 | 0 | 3
Episode: 11/1000 Reward: 100 steps: 10 WDL:      5 | 0 | 3 | 0 | 3
Episode: 12/1000 Reward: -1000 steps: 6 WDL:      5 | 0 | 3 | 1 | 3
Episode: 13/1000 Reward: 1000 steps: 7 WDL:      5 | 0 | 3 | 1 | 4
Episode: 14/1000 Reward: -100 steps: 7 WDL:      5 | 0 | 4 | 1 | 4
Episode: 15/1000 Reward: 100 steps: 6 WDL:      6 | 0 | 4 | 1 | 4
Episod

In [None]:
# Define the random model (opponent)
import random

class RandomPlayer:
    def __init__(self, action_space):
        self.action_space = action_space

    def choose_action(self, state):
        valid_actions = [action for action in range(self.action_space) if env.is_valid_move(action)]
        return random.choice(valid_actions) if valid_actions else None

def run_against_random(agent):
  # Instantiate the random model
  random_model = RandomPlayer(env.action_space)

  # Define evaluation parameters
  num_games = 100
  win_count_dqn = 0
  win_count_random = 0
  draw_count = 0

  # Simulate games
  for _ in range(num_games):
      state = env.reset()
      done = False
      while not done:
          # DQN agent's turn
          action_dqn = agent.act(np.expand_dims(state, axis=0))
          next_state, reward, done = env.step(action_dqn)
          if done:
              if reward == rew:
                  win_count_dqn += 1
              elif reward == -rew:
                  win_count_random += 1
              else:
                  draw_count += 1
              break

          # Random model's turn
          action_random = random_model.choose_action(state)
          next_state, reward, done = env.step(action_random)
          if done:
              if reward == rew:
                  win_count_random += 1
              elif reward == -rew:
                  win_count_dqn += 1
              else:
                  draw_count += 1
              break

          state = next_state

  # Calculate win rates
  win_rate_dqn = win_count_dqn / num_games
  win_rate_random = win_count_random / num_games
  draw_rate = draw_count / num_games

  print("DQN Agent Win Rate:", win_rate_dqn)
  print("Random Model Win Rate:", win_rate_random)
  print("Draw Rate:", draw_rate)

run_against_random(agent)

DQN Agent Win Rate: 0.89
Random Model Win Rate: 0.0
Draw Rate: 0.11


In [None]:
import pickle

with open('FCNN_self.pkl', 'wb') as f:
    pickle.dump(agent.online_network, f)

from google.colab import files
files.download('FCNN_self.pkl')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>