## import libraries

In [5]:
import os
import gym
from gym import spaces
import numpy as np
import tensorflow as tf
import numpy as np
from collections import deque
import random

In [2]:
## implementation

In [3]:
# environment class of game for configuration
class ConnectFourEnv(gym.Env):
    def __init__(self):
        self.board_size = 4
        self.board = np.zeros((self.board_size, self.board_size))
        self.action_space = spaces.Discrete(self.board_size)
        self.observation_space = spaces.Box(low=-1, high=1, shape=(self.board_size, self.board_size), dtype=np.float32)
        self.current_player = 1
        self.winner = None
        self.done = False

    def step(self, action):
        if self.done:
            return self.get_obs(), 0, True, {}
        
        row = self._get_next_row(action)
        if row is None:
            return self.get_obs(), -10, False, {'player': self.current_player}
        
        self.board[row, action] = self.current_player
        winner = self._check_for_winner()
        if winner is not None:
            reward = 10 if winner == 1 else -10
            self.done = True
            self.winner = winner
        else:
            reward = 2
            self.current_player = 1 if self.current_player == -1 else -1
            
        return self.get_obs(), reward, self.done, {'player': self.current_player}

    def reset(self):
        self.board = np.zeros((self.board_size, self.board_size))
        self.current_player = 1
        self.winner = None
        self.done = False
        return self.get_obs()

    def render(self, mode='human'):
        print(self.board)

    def get_obs(self):
        return self.board * self.current_player

    def _get_next_row(self, action):
        for row in range(self.board_size):
            if self.board[row, action] == 0:
                return row
        return None

    def _check_for_winner(self):
        for row in range(self.board_size):
            for col in range(self.board_size):
                player = self.board[row, col]
                if player == 0:
                    continue
                if col + 3 < self.board_size and np.all(self.board[row, col:col+4] == player):
                    return player
                if row + 3 < self.board_size and np.all(self.board[row:row+4, col] == player):
                    return player
                if col + 3 < self.board_size and row + 3 < self.board_size and np.all(np.diagonal(self.board[row:row+4, col:col+4]) == player):
                    return player
                if col + 3 < self.board_size and row - 3 >= 0 and np.all(np.diagonal(np.fliplr(self.board[row-3:row+1, col:col+4])) == player):
                    return player
        if np.count_nonzero(self.board) == self.board_size**2:
            return 0
        return None
    
    def get_valid_actions(self):
        # returns a list of valid actions for the current game state
        valid_actions = []
        for j in range(4):
            if self.board[0][j] == 0:
                valid_actions.append(j)
        return valid_actions

In [4]:
class DQN:
    def __init__(self, env, learning_rate=0.001, discount_factor=0.99, epsilon=0.5, epsilon_decay=0.99, epsilon_min=0.01, batch_size=64, memory_size=100000):
        self.env = env
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        self.batch_size = batch_size
        self.memory = deque(maxlen=memory_size)
        self.model = self._build_model()
    
    def _build_model(self):
        model = tf.keras.models.Sequential([
            tf.keras.layers.Dense(512, activation='relu', input_shape=(self.env.observation_space.shape[0] * self.env.observation_space.shape[1],)),
            tf.keras.layers.Dense(256, activation='relu'),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(32, activation='relu'),
            tf.keras.layers.Dense(16, activation='relu'),
            tf.keras.layers.Dense(8, activation='relu'),
            tf.keras.layers.Dense(self.env.action_space.n, activation='linear')
        ])
        model.compile(optimizer=tf.keras.optimizers.SGD(lr=self.learning_rate), loss='mse')
        return model
    
    def act(self, state):
        if np.random.rand() < self.epsilon and self.epsilon > 0:
            return self.env.action_space.sample()
        return np.argmax(self.model.predict(state.reshape(1, -1))[0])
    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def replay(self):
        if len(self.memory) < self.batch_size:
            return
        batch = np.array(random.sample(self.memory, self.batch_size))
        states = np.stack(batch[:, 0]).reshape(self.batch_size, -1)
        actions = batch[:, 1].astype(int)
        rewards = batch[:, 2]
        next_states = np.stack(batch[:, 3]).reshape(self.batch_size, -1)
        dones = batch[:, 4].astype(bool)
        targets = self.model.predict(states)
        q_values_next = self.model.predict(next_states)
        max_q_values_next = np.amax(q_values_next, axis=1)
        targets[np.arange(self.batch_size), actions] = rewards + (1 - dones) * self.discount_factor * max_q_values_next
        self.model.fit(states, targets, batch_size=self.batch_size, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    def train(self, episodes):
        target_score = 30
        episode = 0
        best_score = -np.inf
        while True:
            episode += 1
            state = self.env.reset()
            done = False
            score = 0
            while not done:
                action = self.act(state)
                next_state, reward, done, _ = self.env.step(action)
                self.remember(state, action, reward, next_state, done)
                state = next_state
                score += reward
                self.replay()
            if score > best_score:
                best_score = score
            self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
            print(f"Episode {episode} - Score: {score:.2f} - Best Score: {best_score:.2f} - Epsilon: {self.epsilon:.3f}")
            if best_score >= target_score:
                print(f"Target score of {target_score} reached after {episode} episodes")
                break

## Training

In [6]:
env = ConnectFourEnv()
agent = DQN(env)
if not os.path.exists('connect4_model.h5'):
    agent.train(episodes=20)
    agent.model.save('connect4_model.h5')

  super(SGD, self).__init__(name, **kwargs)


## Testing

In [8]:
if not os.path.exists('connect4_model.h5'):
    scores = []
    for i in range(20):
        print(f"Game {i+1}")
        state = env.reset()
        done = False
        score = 0
        while not done:
            action = agent.act(state)
            state, reward, done, _ = env.step(action)
            score += reward
        print(f"Score: {score} \n")
        scores.append(score)

    print(f"Average score over 100 games: {np.mean(scores)}")

## Play with trained Agent

In [10]:
import pygame
from pygame.locals import *
from tensorflow.keras.models import load_model
import numpy as np
from connect_four import ConnectFourEnv
import time

model1 = load_model('connect4_model1.h5')
model2 = load_model('connect4_model2.h5')

env = ConnectFourEnv()

# Set up Pygame window
pygame.init()
FPS = 30
WINDOW_WIDTH = 640
WINDOW_HEIGHT = 480
CELL_SIZE = 80
BOARD_WIDTH = env.observation_space.shape[1]
BOARD_HEIGHT = env.observation_space.shape[0]
BOARD_OFFSET_X = int((WINDOW_WIDTH - BOARD_WIDTH * CELL_SIZE) / 2)
BOARD_OFFSET_Y = int((WINDOW_HEIGHT - BOARD_HEIGHT * CELL_SIZE) / 2)
WINDOW_SURF = pygame.display.set_mode((WINDOW_WIDTH, WINDOW_HEIGHT))
pygame.display.set_caption('Connect Four')

# Define colors
BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
RED = (255, 0, 0)
YELLOW = (255, 255, 0)

# Define font
FONT_SIZE = 32
FONT = pygame.font.Font(None, FONT_SIZE)

# Set up game variables
state = env.reset()
done = False
player_turn = 1
winner = None


def draw_board(board):
    for x in range(BOARD_WIDTH):
        for y in range(BOARD_HEIGHT):
            pygame.draw.rect(WINDOW_SURF, WHITE, (BOARD_OFFSET_X + x * CELL_SIZE, BOARD_OFFSET_Y + y * CELL_SIZE, CELL_SIZE, CELL_SIZE))
            if board[y][x] == 1:
                pygame.draw.circle(WINDOW_SURF, YELLOW, (int(BOARD_OFFSET_X + (x + 0.5) * CELL_SIZE), int(BOARD_OFFSET_Y + (y + 0.5) * CELL_SIZE)), int(CELL_SIZE / 2.5))
            elif board[y][x] == -1:
                pygame.draw.circle(WINDOW_SURF, RED, (int(BOARD_OFFSET_X + (x + 0.5) * CELL_SIZE), int(BOARD_OFFSET_Y + (y + 0.5) * CELL_SIZE)), int(CELL_SIZE / 2.5))


def draw_text(text, x, y, color):
    text_surf = FONT.render(text, True, color)
    text_rect = text_surf.get_rect()
    text_rect.center = (x, y)
    WINDOW_SURF.blit(text_surf, text_rect)


# Press the green button in the gutter to run the script.
if __name__ == '__main__':
    while not done:

        # AI player 1's turn
        q_values = model2.predict(state.reshape(1, -1))[0]
        env.state = state
        valid_actions = env.get_valid_actions()
        masked_q_values = np.ma.array(q_values,
                                      mask=[not env.action_space.contains(i) for i in range(env.action_space.n)])
        action = np.argmax(masked_q_values)
        state, reward, done, info = env.step(action)
        if done:
            winner = env.winner

        WINDOW_SURF.fill(BLACK)
        draw_board(state)
        if winner:
            draw_text("Agent {} wins!".format(winner), int(WINDOW_WIDTH / 2), int(WINDOW_HEIGHT / 2),
                      RED if winner == 1 else YELLOW)
        else:
            draw_text("Agent 1 turn", int(WINDOW_WIDTH / 2), int(CELL_SIZE / 2), YELLOW)
        pygame.display.update()
        time.sleep(2)

        # AI player 2's turn
        q_values = model1.predict(state.reshape(1, -1))[0]
        env.state = state
        valid_actions = env.get_valid_actions()
        masked_q_values = np.ma.array(q_values,
                                      mask=[not env.action_space.contains(i) for i in range(env.action_space.n)])
        action = np.argmax(masked_q_values)
        state, reward, done, info = env.step(action)
        if done:
            winner = env.winner

        WINDOW_SURF.fill(BLACK)
        draw_board(state)
        if winner:
            draw_text("Agent {} wins!".format(winner), int(WINDOW_WIDTH / 2), int(WINDOW_HEIGHT / 2),
                      RED if winner == 1 else YELLOW)
        else:
            draw_text("Agent 2 turn", int(WINDOW_WIDTH / 2), int(CELL_SIZE / 2), RED)
        pygame.display.update()
        time.sleep(2)

    time.sleep(2)
    pygame.quit()

# See PyCharm help at https://www.jetbrains.com/help/pycharm/
