In [None]:
%pip install tensorflow
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import collections
import random
import numpy as np

# Keep the global variables as defined by the user
board_size = 4
max_steps = 2 * board_size * board_size

class SnakeEnv:
    def __init__(self, board_size, max_steps):  # Use the parameters passed during initialization
        self.board_size = board_size
        self.max_steps = max_steps
        self.snake = collections.deque()
        self.fruit_pos = None
        self.steps = 0
        self.board = np.zeros((board_size, board_size), dtype=int)  # Add board back
        self.game_over = False
        self.reset()

    def reset(self):
        # Initialize the game state
        self.board = np.zeros((self.board_size, self.board_size), dtype=int)  # Use self.board_size
        self.snake = collections.deque([(0, 0)])
        self.fruit_pos = self._generate_fruit()
        self.steps = 0
        self.game_over = False
        self.board[self.snake[0]] = 1
        # Return the state index directly
        return self._get_state_index()

    def step(self, action):
        # Take an action and return the next state, reward, and done flag
        if self.game_over:
            # Return the current state index when game is over
            return self._get_state_index(), 0, True, {}

        self.steps += 1

        # Calculate next head position based on action (0: up, 1: down, 2: left, 3: right)
        head_r, head_c = self.snake[0]  # Access snake head - check if deque is not empty
        if action == 0:  # Up
            next_head = (head_r - 1, head_c)
        elif action == 1:  # Down
            next_head = (head_r + 1, head_c)
        elif action == 2:  # Left
            next_head = (head_r, head_c - 1)
        elif action == 3:  # Right
            next_head = (head_r, head_c + 1)
        else:
            raise ValueError("Invalid action")

        # Check for collisions *before* modifying the snake deque
        collision = self._is_collision(next_head)

        # Calculate previous distance to fruit
        prev_dist = abs(head_r - self.fruit_pos[0]) + abs(head_c - self.fruit_pos[1]) if self.fruit_pos else 0

        ate_fruit = False
        if next_head == self.fruit_pos:
            ate_fruit = True
            self.fruit_pos = self._generate_fruit()
            self.steps = 0  # Reset steps on eating fruit
            # If fruit is eaten, add the new head but do not remove the tail
            self.snake.appendleft(next_head)
        else:
            # If no fruit is eaten, remove the tail and add the new head
            # Ensure snake is not empty before removing tail
            if len(self.snake) > 0:
                self.board[self.snake[-1]] = 0  # Remove tail from board
                self.snake.pop()  # Remove tail
            # Add new head if no collision
            if not collision:
                self.snake.appendleft(next_head)

        # Update board representation (only for head and potentially old tail/new fruit)
        if not collision:  # Only update board if no collision
            self.board[next_head] = 1  # Add new head
            if ate_fruit and self.fruit_pos:
                self.board[self.fruit_pos[0], self.fruit_pos[1]] = 3  # Add new fruit to board

        # Calculate current distance to fruit
        current_dist = abs(next_head[0] - self.fruit_pos[0]) + abs(next_head[1] - self.fruit_pos[1]) if self.fruit_pos else 0

        reward = self._calculate_reward(prev_dist, current_dist, ate_fruit, collision)

        # Check game over conditions *after* calculating reward
        if collision or self.steps >= self.max_steps or self.fruit_pos is None: # Added check for fruit_pos is None
            self.game_over = True
            if collision or self.steps >= self.max_steps:
                 reward = -10 # Collision or max steps reached penalty
            elif self.fruit_pos is None:
                 reward = 100 # Reward for filling the board (optional, adjust as needed)


        # Return the next state index directly
        return self._get_state_index(), reward, self.game_over, {}

    def _generate_fruit(self):
        # Generate a new fruit position in an empty cell
        all_cells = set((r, c) for r in range(self.board_size) for c in range(self.board_size))
        snake_cells = set(self.snake)
        empty_cells = list(all_cells - snake_cells)

        if not empty_cells:
            return None  # No empty cells
        pos = random.choice(empty_cells)
        self.board[pos] = 3
        return pos

    def _is_collision(self, head):
        # Check for collisions with walls or self
        r, c = head
        # Wall collision
        if r < 0 or r >= self.board_size or c < 0 or c >= self.board_size:
            return True
        # Self collision (check if head is in the body, excluding the potential next head position if it's the only element)
        # Need to be careful here: the head is not yet added to the deque when checking for self-collision.
        # So, check if the proposed next_head is in the current snake body.
        if head in self.snake:  # This checks if the next head position is already occupied by the current snake body
            return True
        return False

    def _calculate_reward(self, prev_dist, current_dist, ate_fruit, collision):
        # Calculate the reward based on the game state
        if collision:
            return -10
        elif ate_fruit:
            return 10
        elif current_dist < prev_dist:
            return 1  # Moving closer to fruit
        elif current_dist > prev_dist:
            return -1  # Moving away from fruit
        else:
            return 0  # No change in distance

    def _get_state_index(self):
        """
        Calculates a unique index for the current state based on:
        - Fruit position (relative to the top-left corner of the board)
        - Snake head position (relative to the top-left corner of the board)
        - Presence of snake body segments in the 8 directions around the snake head.

        Returns:
            A unique integer index representing the state.
        """
        # Ensure snake is not empty
        if not self.snake:
            # This case should ideally not happen in a valid game state,
            # but handle it to avoid errors. Return a default index.
            return 0

        head_r, head_c = self.snake[0]

        # Handle case where fruit is None (board is full)
        if self.fruit_pos is None:
             # Define a specific state index for the 'board full' state
             # This index should be outside the range of indices when fruit is present.
             # We can use the maximum possible index from the case where fruit is present + 1
             # Max index with fruit: (board_size * board_size) * (board_size * board_size) * (2**8) - 1
             # So, board full index could be (board_size * board_size) * (board_size * board_size) * (2**8)
             return (self.board_size * self.board_size) * (self.board_size * self.board_size) * (2**8)


        # 1. Encode Fruit Position (16 possibilities for a 4x4 board)
        fruit_index = self.fruit_pos[0] * self.board_size + self.fruit_pos[1]

        # 2. Encode Head Position (16 possibilities - using absolute position)
        head_index = head_r * self.board_size + head_c

        # Base index combining fruit and head positions
        # Total states so far: 16 * 16
        base_index = fruit_index * (self.board_size * self.board_size) + head_index


        # 3. Encode Presence of Snake Body Segments in 8 Directions around the head
        # Directions: Up, Up-Right, Right, Down-Right, Down, Down-Left, Left, Up-Left
        directions = [(-1, 0), (-1, 1), (0, 1), (1, 1), (1, 0), (1, -1), (0, -1), (-1, -1)]
        obstacle_bits = 0 # Use a bitmask to encode the 8 directions

        for i, (dr, dc) in enumerate(directions):
            check_r, check_c = head_r + dr, head_c + dc

            # Check for wall collision
            if check_r < 0 or check_r >= self.board_size or check_c < 0 or check_c >= self.board_size:
                obstacle_bits |= (1 << i) # Set the bit if there's a wall

            # Check for self collision (excluding the head itself)
            # Check if the position is occupied by any snake segment EXCEPT the head
            elif (check_r, check_c) in list(self.snake)[1:]: # Check if the position is in the snake's body (excluding head)
                 obstacle_bits |= (1 << i) # Set the bit if there's a snake body segment

        # Combine the base index with the obstacle bits
        # Total states: (16 * 16) * 2^8
        state_index = base_index * (2**8) + obstacle_bits

        return state_index

# Calculate the state space size based on the new representation
# Fruit positions * Head positions * 8-directional obstacle bits + 1 (for board full state)
state_space_size = (board_size * board_size) * (board_size * board_size) * (2**8) + 1
action_space_size = 4 # Up, Down, Left, Right

# Initialize the Q-table with zeros
q_table = np.zeros((state_space_size, action_space_size))

print(f"New state space size: {state_space_size}")
print(f"Q-table shape: {q_table.shape}")

In [50]:
import numpy as np
import os

# Define the file path for saving/loading the Q-table
q_table_file_path = '/content/drive/MyDrive/Colab Notebooks/snake_by_Qlearning1/snake_by_Qlearning_more_state_Qtable.npy'

def save_q_table(q_table, file_path):
    """Saves the Q-table to a file using numpy's .npy format."""
    # Ensure the directory exists before saving
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    np.save(file_path, q_table)
    print(f"Q-table saved to {file_path}")

def load_q_table(file_path):
    """Loads the Q-table from a file using numpy's .npy format."""
    if os.path.exists(file_path):
        print(f"Loading Q-table from {file_path}")
        return np.load(file_path)
    else:
        print(f"No saved Q-table found at {file_path}. Starting with a new Q-table.")
        return None


def epsilon_greedy_policy(q_table, state_index, epsilon, action_space_size):
    """
    Implements the epsilon-greedy policy to select an action.

    Args:
        q_table (np.ndarray): The Q-table.
        state_index (int): The index of the current state.
        epsilon (float): The exploration rate.
        action_space_size (int): The size of the action space.

    Returns:
        int: The selected action.
    """
    # Explore: choose a random action
    if random.uniform(0, 1) < epsilon:
        return random.randrange(action_space_size)
    # Exploit: choose the action with the highest Q-value for the current state
    else:
        return np.argmax(q_table[state_index, :])

In [None]:
# Q-learning parameters
alpha = 0.2  # Learning rate (updated as requested)
gamma = 0.6  # Discount factor
epsilon = 1.0  # Exploration rate
max_epsilon = 1.0  # Exploration probability at start
min_epsilon = 0.05 # Minimum exploration probability (updated as requested)
epsilon_decay_rate = 0.999 # Exponential decay rate for epsilon

num_episodes = 100000 # Number of training episodes (increased by 10 times)
saving_frequency = 10000 # Save Q-table every 10,000 episodes

# Create an instance of the environment
env = SnakeEnv(board_size, max_steps)

# Training loop
for episode in range(num_episodes):
    # Environment now returns the state index directly
    state_index = env.reset()
    done = False

    while not done:
        # Select action using epsilon-greedy policy
        action = epsilon_greedy_policy(q_table, state_index, epsilon, action_space_size)

        # Take action in the environment
        # Environment now returns the next state index directly
        next_state_index, reward, done, info = env.step(action)

        # Update Q-value using the Q-learning formula
        # Q(s, a) = Q(s, a) + alpha * [reward + gamma * max(Q(s', a')) - Q(s, a)]
        if not done:
            max_future_q = np.max(q_table[next_state_index, :])
            current_q = q_table[state_index, action]
            new_q = current_q + alpha * (reward + gamma * max_future_q - current_q)
            q_table[state_index, action] = new_q

            # Update current state
            state_index = next_state_index
        else:
            # If done, just update the Q-value for the terminal state transition
            current_q = q_table[state_index, action]
            new_q = current_q + alpha * (reward - current_q) # No future Q-value for terminal state
            q_table[state_index, action] = new_q


    # Decay epsilon
    epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-epsilon_decay_rate*episode)

    # Save Q-table periodically
    if (episode + 1) % saving_frequency == 0:
        save_q_table(q_table, q_table_file_path)
        print(f"Saved Q-table at episode {episode + 1}")


    if (episode + 1) % 1000 == 0: # Print progress less frequently for more episodes
        print(f"Episode {episode + 1}/{num_episodes} completed. Epsilon: {epsilon:.2f}")

print("Training finished.")

In [None]:
# Continuous Training Loop (Self-contained)

# Define Q-learning Parameters
alpha = 0.2  # Learning rate
gamma = 0.6  # Discount factor
# The fixed minimum epsilon for continuous training
FIXED_MIN_EPSILON = 0.05 # Hardcoded minimum epsilon for continuous runs

# Define Training Run Parameters for this continuous training loop
continuous_run_episodes = 100000 # Train for 100,000 episodes per execution
continuous_saving_frequency = 10000 # Save Q-table every 10,000 episodes

# Define the file path for saving/loading the Q-table (Ensure this matches your desired path)

# Load the Q-table if it exists
# Assuming save_q_table and load_q_table functions are defined elsewhere and accessible
# Assuming state_space_size and action_space_size are defined elsewhere and accessible (e.g., in the environment cell)
loaded_q_table = load_q_table(q_table_file_path)

# If a Q-table was loaded, use it. Otherwise, initialize a new one.
if loaded_q_table is not None:
    q_table = loaded_q_table
    print("Continuing training from loaded Q-table.")
    # As requested, set epsilon to the fixed minimum value when continuing training
    epsilon = FIXED_MIN_EPSILON
    print(f"Starting continuous training with epsilon: {epsilon:.2f}")
else:
    # This case handles the very first execution if no Q-table exists
    print("No saved Q-table found. Starting new training run with fixed minimum epsilon.")
    # Initialize a new Q-table (Requires state_space_size and action_space_size to be defined globally)
    q_table = np.zeros((state_space_size, action_space_size))
    # Start with the fixed minimum epsilon even for a fresh start in continuous mode
    epsilon = FIXED_MIN_EPSILON


# Create a new instance of the environment for this training run
# Requires SnakeEnv, board_size, and max_steps to be defined globally
env = SnakeEnv(board_size, max_steps)

# Training loop for continuous execution
for episode_in_run in range(continuous_run_episodes):
    # Environment now returns the state index directly
    state_index = env.reset()
    done = False

    while not done:
        # Select action using epsilon-greedy policy
        # Use the fixed minimum epsilon
        # Requires epsilon_greedy_policy and action_space_size to be defined globally
        action = epsilon_greedy_policy(q_table, state_index, epsilon, action_space_size)

        # Take action in the environment
        # Environment now returns the next state index directly
        next_state_index, reward, done, info = env.step(action)

        # Update Q-value using the Q-learning formula
        # Requires alpha and gamma to be defined globally
        if not done:
            # Ensure next_state_index is within bounds before accessing q_table
            # Requires state_space_size to be defined globally
            if next_state_index < 0 or next_state_index >= state_space_size:
                print(f"Error: Next state index {next_state_index} out of bounds.")
                # Handle error - perhaps break the loop or skip update
                break # Exit inner while loop

            max_future_q = np.max(q_table[next_state_index, :])
            current_q = q_table[state_index, action]
            new_q = current_q + alpha * (reward + gamma * max_future_q - current_q)
            q_table[state_index, action] = new_q

            # Update current state
            state_index = next_state_index
        else:
            # If done, just update the Q-value for the terminal state transition
            current_q = q_table[state_index, action]
            new_q = current_q + alpha * (reward - current_q) # No future Q-value for terminal state
            q_table[state_index, action] = new_q


    # No epsilon decay needed as epsilon is fixed at the minimum


    # Save Q-table periodically during this continuous run
    # Requires save_q_table and continuous_saving_frequency to be defined globally
    if (episode_in_run + 1) % continuous_saving_frequency == 0:
        save_q_table(q_table, q_table_file_path)
        print(f"Saved Q-table during continuous run at episode {episode_in_run + 1}")


    if (episode_in_run + 1) % 1000 == 0: # Print progress less frequently
        print(f"Continuous Run Episode {episode_in_run + 1}/{continuous_run_episodes} completed.")

# Save Q-table at the end of this continuous run
# Requires save_q_table and q_table_file_path to be defined globally
save_q_table(q_table, q_table_file_path)
print("Continuous training run finished. Q-table saved.")

In [None]:
# Evaluate Trained AI (Greedy Policy)

# Ensure necessary variables and functions are accessible:
# SnakeEnv, load_q_table, q_table_file_path, state_space_size, action_space_size
# get_state_index (defined within SnakeEnv class now)
# board_size, max_steps

# Define the number of evaluation episodes
num_eval_episodes = 5 # Run 5 evaluation episodes

# Load the trained Q-table
loaded_q_table = load_q_table(q_table_file_path)

if loaded_q_table is None:
    print("Error: No trained Q-table found. Please train the agent first.")
else:
    q_table = loaded_q_table
    print("Loaded trained Q-table for evaluation.")

    # Create an instance of the environment
    env = SnakeEnv(board_size, max_steps)

    total_rewards = []
    total_steps = []
    fruits_eaten = []

    # Evaluation loop
    for episode in range(num_eval_episodes):
        print(f"\n--- Evaluation Episode {episode + 1}/{num_eval_episodes} ---")
        state_index = env.reset()
        done = False
        episode_reward = 0
        episode_steps = 0

        # Simple text-based visualization function using tabs and different characters
        def print_board(snake, fruit_pos, board_size):
            # Create a board representation with characters
            board_chars = [[' ' for _ in range(board_size)] for _ in range(board_size)]

            # Mark snake
            for i, (r, c) in enumerate(snake):
                if 0 <= r < board_size and 0 <= c < board_size:
                    board_chars[r][c] = 'H' if i == 0 else 'B' # H for Head, B for Body

            # Mark fruit
            if fruit_pos and 0 <= fruit_pos[0] < board_size and 0 <= fruit_pos[1] < board_size:
                 board_chars[fruit_pos[0]][fruit_pos[1]] = 'F' # F for Fruit

            # Print the board using tabs and newlines
            for r in range(board_size):
                print("\t".join(board_chars[r]))
            print("-" * (board_size * 4)) # Separator line


        while not done:
            # Select action using greedy policy (epsilon = 0)
            # Select the action with the highest Q-value for the current state
            if state_index < 0 or state_index >= state_space_size:
                print(f"Error: State index {state_index} out of bounds during evaluation.")
                # Break if state index is invalid
                break

            # Use np.argmax to get the action with the highest Q-value
            action = np.argmax(q_table[state_index, :])

            # Print current state and chosen action (optional, for debugging)
            # print(f"State Index: {state_index}, Chosen Action: {action}")

            # Visualize current board before the step
            print(f"Step: {episode_steps}, Action: {action}")
            print_board(env.snake, env.fruit_pos, env.board_size)


            # Take action in the environment
            # env.step now returns the next state index directly
            next_state_index, reward, done, info = env.step(action)

            # Accumulate reward and steps
            episode_reward += reward
            episode_steps += 1

            # Update current state index
            state_index = next_state_index

            # You can add a small delay here to slow down the visualization
            # import time
            # time.sleep(0.1)

        # Game Over for this episode
        total_rewards.append(episode_reward)
        total_steps.append(episode_steps)
        # Calculate fruits eaten: Initial length is 1, each fruit adds 1 to length
        fruits_eaten.append(len(env.snake) - 1)


        print(f"Episode finished. Reward: {episode_reward}, Steps: {episode_steps}, Snake Length: {len(env.snake)}")
        # Visualize the final board state
        print("Final Board State:")
        print_board(env.snake, env.fruit_pos, env.board_size)


    # Print average performance
    avg_reward = np.mean(total_rewards)
    avg_steps = np.mean(total_steps)
    avg_fruits_eaten = np.mean(fruits_eaten)

    print("\n--- Evaluation Results ---")
    print(f"Average Reward over {num_eval_episodes} episodes: {avg_reward:.2f}")
    print(f"Average Steps per episode: {avg_steps:.2f}")
    print(f"Average Fruits eaten per episode: {avg_fruits_eaten:.2f}")

    # You can also add a more detailed visualization here if needed,
    # perhaps rendering the game board state at each step.