### Imports

In [1]:
import numpy as np
import pygame
import random

pygame 2.4.0 (SDL 2.26.4, Python 3.10.4)
Hello from the pygame community. https://www.pygame.org/contribute.html


### Constants

In [2]:
# Game Constants
WINDOW_WIDTH = 560
WINDOW_HEIGHT = 530
GRID_SIZE = 80
PADDING = 10
BOARD_WIDTH = 7
BOARD_HEIGHT = 6
PLAYER_PIECE = 1
AI_PIECE = 2

# Colors
BACKGROUND_COLOR = (0, 0, 255)
EMPTY_COLOR = (0, 0, 0)
PLAYER_COLOR = (255, 0, 0)
AI_COLOR = (255, 200, 0)


### Connect4 Model


In [3]:

class ConnectFourModel:
    def __init__(self):
        self.board = np.zeros((BOARD_HEIGHT, BOARD_WIDTH), dtype=np.int8)
        self.current_player = 1
        
    def reset(self):
        self.current_player = 1
        self.board = np.zeros((BOARD_HEIGHT, BOARD_WIDTH), dtype=np.int8)

    def get_board(self):
        return self.board

    def get_current_player(self):
        return self.current_player

    def get_valid_moves(self):
        valid_moves = []
        for col in range(BOARD_WIDTH):
            if self.board[BOARD_HEIGHT - 1][col] == 0:
                valid_moves.append(col)
        return valid_moves

    def make_move(self, column):
        for row in range(BOARD_HEIGHT):
            if self.board[row][column] == 0:
                self.board[row][column] = self.current_player
                self.current_player = 3 - self.current_player
                return True
        return False

    def undo_move(self, column):
        for row in range(BOARD_HEIGHT - 1, -1, -1):
            if self.board[row][column] != 0:
                self.board[row][column] = 0
                self.current_player = 3 - self.current_player
                return

    def is_terminal(self):
        return len(self.get_valid_moves()) == 0 or self.check_winner() is not None

    def check_winner(self):
        lines = self.get_lines(PLAYER_PIECE)
        for line in lines:
            if self.is_winning_line(line, PLAYER_PIECE):
                return PLAYER_PIECE

        lines = self.get_lines(AI_PIECE)
        for line in lines:
            if self.is_winning_line(line, AI_PIECE):
                return AI_PIECE

        return None

    def get_lines(self, player):
        lines = []

        # Horizontal lines
        for row in range(BOARD_HEIGHT):
            for col in range(BOARD_WIDTH - 3):
                lines.append(self.board[row, col:col + 4])

        # Vertical lines
        for col in range(BOARD_WIDTH):
            for row in range(BOARD_HEIGHT - 3):
                lines.append(self.board[row:row + 4, col])

        # Diagonal lines
        for row in range(BOARD_HEIGHT - 3):
            for col in range(BOARD_WIDTH - 3):
                lines.append(self.board[row:row + 4, col:col + 4].diagonal())

            for col in range(3, BOARD_WIDTH):
                lines.append(np.fliplr(self.board[row:row + 4, col - 3:col + 1]).diagonal())

        return lines

    def is_winning_line(self, line, piece):
        return np.array_equal(line, [piece] * 4)

### Q-Learning Agent

In [4]:

class QLearningAgent:
    def __init__(self, learning_rate=0.2, discount_factor=0.8, exploration_rate=0.05):
        self.q_table = {}
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate

    def choose_move(self, model):
        state = self.convert_state_to_string(model.get_board())
        valid_moves = model.get_valid_moves()

        if random.uniform(0, 1) < self.exploration_rate:
            return random.choice(valid_moves)
        else:
            q_values = [self.q_table.get((state, move), 0) for move in valid_moves]
            max_q = max(q_values)
            best_moves = [move for move, q in zip(valid_moves, q_values) if q == max_q]
            return random.choice(best_moves)

    def update_q_table(self, old_state, action, new_state, reward):
        old_q = self.q_table.get((old_state, action), 0)
        max_q = max([self.q_table.get((new_state, move), 0) for move in range(BOARD_WIDTH)])
        new_q = old_q + self.learning_rate * (reward + self.discount_factor * max_q - old_q)
        self.q_table[(old_state, action)] = new_q

    def convert_state_to_string(self, state):
        return ''.join(str(piece) for row in state for piece in row)



### Random Agent


In [5]:
class RandomAgent:
    def choose_move(self, model):
        valid_moves = model.get_valid_moves()
        random_agent = model.get_current_player()
        opponent = 3 - random_agent

        # Check if any move leads to a win for the agent
        for move in valid_moves:
            model.make_move(move)
            if model.check_winner() == random_agent:
                model.undo_move(move)
                return move
            model.undo_move(move)

        # Check if any move blocks the opponent from winning
        for move in valid_moves:
            model.make_move(move)
            if model.check_winner() == opponent:
                model.undo_move(move)
                return move
            model.undo_move(move)

        # Prioritize moves that create a longer line for the agent
        best_move = None
        max_line_length = 0
        for move in valid_moves:
            model.make_move(move)
            line_length = self.get_max_line_length(model, random_agent)
            model.undo_move(move)

            if line_length > max_line_length:
                max_line_length = line_length
                best_move = move

        if best_move is not None:
            return best_move

        # If no winning, blocking, or line-extending moves, choose a random valid move
        return random.choice(valid_moves)

    def get_max_line_length(self, model, player_piece):
        max_line_length = 0
        lines = model.get_lines(player_piece)
        for line in lines:
            line_length = self.get_line_length(line, player_piece)
            if line_length > max_line_length:
                max_line_length = line_length
        return max_line_length

    def get_line_length(self, line, player_piece):
        count = 0
        for piece in line:
            if piece == player_piece:
                count += 1
            else:
                break
        return count


### Connect4 View


In [6]:

class ConnectFourView:
    def __init__(self):
        pygame.init()
        self.clock = pygame.time.Clock()
        self.screen = pygame.display.set_mode((WINDOW_WIDTH, WINDOW_HEIGHT))
        pygame.display.set_caption("Connect Four")
        self.font = pygame.font.SysFont(None, 48)

    def set_model(self, model):
        self.model = model

    def draw_board(self):
        self.screen.fill(BACKGROUND_COLOR)

        board = self.model.get_board()
        for row in range(BOARD_HEIGHT):
            for col in range(BOARD_WIDTH):
                color = EMPTY_COLOR
                if board[row][col] == 1:
                    color = PLAYER_COLOR
                elif board[row][col] == 2:
                    color = AI_COLOR

                pygame.draw.circle(
                    self.screen,
                    color,
                    (col * GRID_SIZE + GRID_SIZE // 2, (BOARD_HEIGHT - row) * GRID_SIZE),
                    GRID_SIZE // 2 - PADDING
                )

        pygame.display.flip()

    def display_winner(self, winner):
        if winner is None:
            text = self.font.render("It's a tie!", True, (255, 255, 255))
        else:
            text = self.font.render(f"Player {winner} wins!", True, (255, 255, 255))

        text_rect = text.get_rect(center=(WINDOW_WIDTH // 2, WINDOW_HEIGHT // 2))
        self.screen.blit(text, text_rect)
        pygame.display.flip()

    def get_user_move(self):
        while True:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    pygame.quit()
                    quit()
                elif event.type == pygame.MOUSEBUTTONDOWN:
                    x, _ = pygame.mouse.get_pos()
                    column = x // GRID_SIZE
                    if column in self.model.get_valid_moves():
                        return column

            self.clock.tick(60)




In [7]:
def calculate_reward(source_state, destination_state, player_piece, opponent_piece):
    if np.any([is_winning_line(line, player_piece) for line in get_lines(destination_state, player_piece)]):
        # Reward for winning move
        return 100

    if np.any([is_winning_line(line, opponent_piece) for line in get_lines(destination_state, opponent_piece)]):
        # Penalize moves that lead to an opponent's win
        return -100

    if len(get_valid_moves(destination_state)) == 0:
        # Neutral reward for a draw
        return 0

    # Encourage progress towards a potential win
    player_line_count = count_winning_lines(destination_state, player_piece)
    opponent_line_count = count_winning_lines(destination_state, opponent_piece)
    line_reward = player_line_count - opponent_line_count

    # Encourage horizontal connections
    horizontal_reward = 0
    for row in destination_state:
        for col in range(BOARD_WIDTH - 3):
            window = row[col:col + 4]
            if np.array_equal(window, [player_piece] * 4):
                horizontal_reward += 10
            elif np.array_equal(window, [opponent_piece] * 4):
                horizontal_reward -= 10

    # Discourage losing moves
    if np.any([is_winning_line(line, opponent_piece) for line in get_lines(destination_state, player_piece)]):
        return -50

    # Exploration bonus to encourage the agent to explore initially
    if np.array_equal(source_state, destination_state):
        return np.random.normal(loc=5, scale=2)

    # Combine rewards with different weights
    reward = line_reward + 0.1 * horizontal_reward

    return reward


# Helper functions used within the calculate_reward function
def get_valid_moves(board_state):
    valid_moves = []
    for col in range(BOARD_WIDTH):
        if board_state[BOARD_HEIGHT - 1][col] == 0:
            valid_moves.append(col)
    return valid_moves

def get_lines(board_state, player):
    lines = []

    # Horizontal lines
    for row in range(BOARD_HEIGHT):
        for col in range(BOARD_WIDTH - 3):
            lines.append(board_state[row, col:col + 4])

    # Vertical lines
    for col in range(BOARD_WIDTH):
        for row in range(BOARD_HEIGHT - 3):
            lines.append(board_state[row:row + 4, col])

    # Diagonal lines
    for row in range(BOARD_HEIGHT - 3):
        for col in range(BOARD_WIDTH - 3):
            lines.append(board_state[row:row + 4, col:col + 4].diagonal())

        for col in range(3, BOARD_WIDTH):
            lines.append(np.fliplr(board_state[row:row + 4, col - 3:col + 1]).diagonal())

    return lines

def is_winning_line(line, piece):
    return np.array_equal(line, [piece] * 4)

def count_winning_lines(board_state, player_piece):
    return np.sum([is_winning_line(line, player_piece) for line in get_lines(board_state, player_piece)])


### Connect4 Game

In [8]:

class ConnectFourGame:
    def __init__(self):
        self.model = ConnectFourModel()
        self.view = ConnectFourView()
        self.agent = QLearningAgent()
        self.game_over = False
        
    def train_agent(self, episodes):
        random_agent = RandomAgent()
        
        for _ in range(episodes):
            self.model = ConnectFourModel()

            while not self.model.is_terminal():
                if self.model.get_current_player() == PLAYER_PIECE:
                    move = random_agent.choose_move(self.model)
                    self.model.make_move(move)
                else:
                    source_state = self.model.get_board()
                    move = self.agent.choose_move(self.model)
                    self.model.make_move(move)
                    destination_state = self.model.get_board()
                    reward = calculate_reward(source_state,destination_state,AI_PIECE,PLAYER_PIECE)
                    
                    self.agent.update_q_table(self.agent.convert_state_to_string(source_state),
                                              move,
                                              self.agent.convert_state_to_string(destination_state),
                                              reward)

            self.model.reset()

        
        
        
    def run(self):
        self.model.reset()
        self.view.set_model(self.model)
        

        while not self.game_over:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    self.game_over = True

            if not self.model.is_terminal():
                if self.model.get_current_player() == PLAYER_PIECE:
                    move = self.view.get_user_move()
                    self.make_move(move)
                else:
                    move = self.agent.choose_move(self.model)
                    self.make_move(move)

            self.view.draw_board()
            self.check_game_over()
            

    def make_move(self, column):
        if self.model.make_move(column):
            self.check_game_over()

    def check_game_over(self):
        winner = self.model.check_winner()
        if winner is not None or len(self.model.get_valid_moves()) == 0:
            self.game_over = True
            self.view.display_winner(winner)


### Training Q-learning Model 

In [9]:
game = ConnectFourGame()
game.train_agent(100)

##  Running a game

In [10]:
game.run()

### Edited ConnectFourGame() class