In [1]:
# Imports
import torch
import numpy as np
import gymnasium as gym
from collections import deque
import pygame
import random
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [3]:
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.l1 = nn.Linear(input_dim, 128)
        self.l2 = nn.Linear(128, 128)
        self.out = nn.Linear(128, output_dim)

    def forward(self, x):
        x = F.leaky_relu(self.l1(x), negative_slope=0.01)
        x = F.leaky_relu(self.l2(x), negative_slope=0.01)
        return self.out(x)

In [4]:
# While training neural networks, we split the data into batches.
# To improve the training, we need to remove the "correlation" between game states
# The buffer starts storing states and once it reaches maximum capacity, it replaces
# states at random which reduces the correlation.
class ExperienceBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)


In [5]:
def train_dqn_cartpole():
    env = gym.make("CartPole-v1")
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n

    policy_net = DQN(state_dim, action_dim)
    target_net = DQN(state_dim, action_dim)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()

    optimizer = optim.Adam(policy_net.parameters(), lr=1e-3)
    buffer = ExperienceBuffer(10000)

    batch_size = 64
    gamma = 0.99
    episodes = 500
    target_update = 10
    epsilon = 1.0
    decay = 0.995
    epsilon_min = 0.01

    for episode in range(episodes):
        obs, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            state = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)

            if random.random() < epsilon:
                action = env.action_space.sample()
            else:
                with torch.no_grad():
                    q_values = policy_net(state)
                action = torch.argmax(q_values, dim=1).item()

            next_obs, reward, done, _, _ = env.step(action)
            buffer.push(obs, reward, action, next_obs, done)

            obs = next_obs
            total_reward += reward

            if len(buffer) > batch_size:
                states, rewards, actions, next_states, dones = buffer.sample(batch_size)
                states = torch.tensor(states, dtype=torch.float32)  # Convert to tensor
                actions = torch.tensor(actions, dtype=torch.long)
                rewards = torch.tensor(rewards, dtype=torch.float32)
                next_states = torch.tensor(next_states, dtype=torch.float32)
                dones = torch.tensor(dones, dtype=torch.float32)

                q_values = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze()

                with torch.no_grad():
                    q_next = target_net(next_states).max(1)[0]
                    target = rewards + gamma * q_next * (1 - dones)

                loss = F.mse_loss(q_values, target)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        if episode % target_update == 0:
            target_net.load_state_dict(policy_net.state_dict())

        epsilon = max(epsilon_min, epsilon * decay)

        if (episode + 1) % 10 == 0:
            print(f"Episode {episode + 1}: Total Reward = {total_reward}, Epsilon = {epsilon:.3f}")

    env.close()
    return policy_net

In [6]:
def evaluate_cartpole_model(model, episodes=100, render=True):
    env = gym.make("CartPole-v1", render_mode="human" if render else None)
    obs_dim = env.observation_space.shape[0]
    n_actions = env.action_space.n

    model.eval()

    rewards = []

    for episode in range(episodes):
        obs, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            state = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
            with torch.no_grad():
                q_values = model(state)
                action = torch.argmax(q_values, dim=1).item()

            obs, reward, done, _, _ = env.step(action)
            total_reward += reward

            if render:
                env.render()

        rewards.append(total_reward)
        print(f"Episode {episode + 1}: Reward = {total_reward}")

    env.close()
    avg_reward = sum(rewards) / episodes
    print(f"Average reward over {episodes} episodes: {avg_reward}")

In [7]:

trained_model = train_dqn_cartpole()
evaluate_cartpole_model(trained_model, episodes=10, render=False)

  states = torch.tensor(states, dtype=torch.float32)  # Convert to tensor


Episode 10: Total Reward = 25.0, Epsilon = 0.951
Episode 20: Total Reward = 28.0, Epsilon = 0.905
Episode 30: Total Reward = 11.0, Epsilon = 0.860
Episode 40: Total Reward = 47.0, Epsilon = 0.818
Episode 50: Total Reward = 29.0, Epsilon = 0.778
Episode 60: Total Reward = 20.0, Epsilon = 0.740
Episode 70: Total Reward = 51.0, Epsilon = 0.704
Episode 80: Total Reward = 17.0, Epsilon = 0.670
Episode 90: Total Reward = 17.0, Epsilon = 0.637
Episode 100: Total Reward = 119.0, Epsilon = 0.606
Episode 110: Total Reward = 19.0, Epsilon = 0.576
Episode 120: Total Reward = 14.0, Epsilon = 0.548
Episode 130: Total Reward = 16.0, Epsilon = 0.521
Episode 140: Total Reward = 33.0, Epsilon = 0.496
Episode 150: Total Reward = 114.0, Epsilon = 0.471
Episode 160: Total Reward = 118.0, Epsilon = 0.448
Episode 170: Total Reward = 119.0, Epsilon = 0.427
Episode 180: Total Reward = 112.0, Epsilon = 0.406
Episode 190: Total Reward = 45.0, Epsilon = 0.386
Episode 200: Total Reward = 49.0, Epsilon = 0.367
Epis

In [24]:
class SnakeGame(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 10}

    def __init__(self, size=10, render_mode=None):
        super().__init__()
        self.size = size
        self.cell_size = 30
        self.screen_size = self.size * self.cell_size
        self.render_mode = render_mode

        self.action_space = gym.spaces.Discrete(4)  # 0: right, 1: up, 2: left, 3: down
        self.observation_space = gym.spaces.Box(0, 2, shape=(self.size, self.size), dtype=np.uint8)

        self.screen = None
        self.clock = None

        self.snake = deque()
        self.food = None
        self.direction = [1, 0]

        if self.render_mode == "human":
            pygame.init()
            self.screen = pygame.display.set_mode((self.screen_size, self.screen_size))
            self.clock = pygame.time.Clock()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.snake.clear()
        mid = self.size // 2
        self.snake.appendleft([mid, mid])
        self.direction = [1, 0]
        self._place_food()
        obs = self._get_obs()

        if self.render_mode == "human":
            self._render_init()

        return obs, {}

    def step(self, action):
        # TODO: Change reward schema to avoid the following
        # 1) 180 degree turns
        # 2) Wall collisions
        # 3) Being slow at collecting food

        if action == 0 and self.direction != [-1, 0]: self.direction = [1, 0]
        elif action == 1 and self.direction != [0, 1]: self.direction = [0, -1]
        elif action == 2 and self.direction != [1, 0]: self.direction = [-1, 0]
        elif action == 3 and self.direction != [0, -1]: self.direction = [0, 1]

        head = self.snake[0]
        new_head = [head[0] + self.direction[0], head[1] + self.direction[1]]

        done = False
        reward = 0

        if not (0 <= new_head[0] < self.size and 0 <= new_head[1] < self.size):
            done = True
            reward-=1
        else:
            body_to_check = list(self.snake)[:-1] if new_head != self.food else list(self.snake)
            if new_head in body_to_check:
                done=True
                reward -=1

        if not done:
            self.snake.appendleft(new_head)
            if new_head == self.food:
                self._place_food()
                reward+=10
            else:
                self.snake.pop()
                reward-=0.1

        obs = self._get_obs()

        if self.render_mode == "human":
            self.render()

        return obs, reward, done, False, {}


    def _get_obs(self):
      grid = np.zeros((self.size, self.size), dtype=np.uint8)

      for segment in self.snake:
          x, y = segment
          if 0 <= x < self.size and 0 <= y < self.size:
              grid[y, x] = 1  # snake

      if self.food:
          fx, fy = self.food
          if 0 <= fx < self.size and 0 <= fy < self.size:
              grid[fy, fx] = 2  # food

      return grid

    def _place_food(self):
        positions = set(tuple(p) for p in self.snake)
        empty = [(x, y) for x in range(self.size) for y in range(self.size) if (x, y) not in positions]
        self.food = list(random.choice(empty)) if empty else None

    def render(self):
        if self.screen is None:
            self._render_init()

        self.screen.fill((0, 0, 0))
        for x, y in self.snake:
            pygame.draw.rect(
                self.screen, (0, 255, 0),
                pygame.Rect(x * self.cell_size, y * self.cell_size, self.cell_size, self.cell_size)
            )
        if self.food:
            fx, fy = self.food
            pygame.draw.rect(
                self.screen, (255, 0, 0),
                pygame.Rect(fx * self.cell_size, fy * self.cell_size, self.cell_size, self.cell_size)
            )

        pygame.display.flip()
        self.clock.tick(self.metadata["render_fps"])

    def _render_init(self):
        pygame.init()
        self.screen = pygame.display.set_mode((self.size * self.cell_size, self.size * self.cell_size))
        self.clock = pygame.time.Clock()

    def close(self):
        if self.screen:
            pygame.quit()
            self.screen = None

In [25]:
# TODO: Implement training logic for Snake Game here
def train_dqn_snake(env,episodes=500,gamma=0.99,epsilon_start=1,epsilon_min=0.1,decay=0.995,learning_rate=0.001,batch_size=64,buffer_size=10000,freq=10):
    obs_dim = env.size*env.size
    action_dim=env.action_space.n

    policy_net=DQN(obs_dim,action_dim)
    target_net=DQN(obs_dim,action_dim)

    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()

    optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
    buffer = ExperienceBuffer(buffer_size)
    target_update = freq
    epsilon=epsilon_start

    #moving through all the episodes
    for episode in range(episodes):
      obs,_=env.reset()
      total_reward=0
      done = False

      #while the game is running
      while done==False:
        state = torch.tensor(obs.flatten(), dtype=torch.float32).unsqueeze(0)
        #choose an action via e-greedy algo
        if random.random() <epsilon:
            action=env.action_space.sample()
        else:
          with torch.no_grad():
              q_values = policy_net(state)
          action = torch.argmax(q_values, dim=1).item()



        #getting new observation after taking that action
        next_obs,reward,done,_,_=env.step(action)
        #pushing it into the action replay buffer
        buffer.push(obs.flatten(), reward, action, next_obs.flatten(), done)

        obs=next_obs
        total_reward+=reward

        if len(buffer) > batch_size:
                states, rewards,actions, next_states, dones = buffer.sample(batch_size)
                states = torch.tensor(states, dtype=torch.float32)  # Convert to tensor
                actions = torch.tensor(actions, dtype=torch.long)
                rewards = torch.tensor(rewards, dtype=torch.float32)
                next_states = torch.tensor(next_states, dtype=torch.float32)
                dones = torch.tensor(dones, dtype=torch.float32)

                q_values = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze()

                with torch.no_grad():
                    q_next = target_net(next_states).max(1)[0]
                    target = rewards + gamma * q_next * (1 - dones)

                loss = F.mse_loss(q_values, target)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

      if episode % target_update == 0:
          target_net.load_state_dict(policy_net.state_dict())

      epsilon = max(epsilon_min, epsilon * decay)

      if (episode + 1) % 10 == 0:
          print(f"Episode {episode + 1}: Total Reward = {total_reward:.2f}, Epsilon = {epsilon:.3f}")

    return policy_net


In [26]:
def evaluate_snake_model(model, size=20, episodes=10, render=True):
    env = SnakeGame(size=size, render_mode="human" if render else None)
    model.eval()

    rewards = []

    for episode in range(episodes):
        obs, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            # Flatten the observation before passing to the model
            state = torch.tensor(obs.flatten(), dtype=torch.float32).unsqueeze(0)
            with torch.no_grad():
                q_values = model(state)
                action = torch.argmax(q_values, dim=1).item()

            obs, reward, done, _, _ = env.step(action)
            total_reward += reward

            if render:
                env.render()

        rewards.append(total_reward)
        print(f"Episode {episode + 1}: Reward = {round(total_reward, 2)}")

    env.close()
    avg_reward = sum(rewards) / episodes
    print(f"Average reward over {episodes} episodes: {round(avg_reward, 2)}")


In [None]:

env = SnakeGame(size=20, render_mode=None)  # Or adjust the size as needed
trained_model = train_dqn_snake(env)
evaluate_snake_model(trained_model, size=20, episodes=10, render=True)


Episode 10: Total Reward = -7.80, Epsilon = 0.951
Episode 20: Total Reward = -9.70, Epsilon = 0.905
Episode 30: Total Reward = -4.00, Epsilon = 0.860
Episode 40: Total Reward = -2.50, Epsilon = 0.818
Episode 50: Total Reward = -2.30, Epsilon = 0.778
Episode 60: Total Reward = -8.80, Epsilon = 0.740
Episode 70: Total Reward = 1.10, Epsilon = 0.704
Episode 80: Total Reward = -7.10, Epsilon = 0.670
Episode 90: Total Reward = -3.30, Epsilon = 0.637
Episode 100: Total Reward = -3.40, Epsilon = 0.606
Episode 110: Total Reward = -4.20, Epsilon = 0.576
Episode 120: Total Reward = -7.50, Epsilon = 0.548
Episode 130: Total Reward = -4.90, Epsilon = 0.521
Episode 140: Total Reward = -4.90, Epsilon = 0.496
Episode 150: Total Reward = -3.40, Epsilon = 0.471
Episode 160: Total Reward = -6.30, Epsilon = 0.448
Episode 170: Total Reward = -3.50, Epsilon = 0.427
Episode 180: Total Reward = -4.40, Epsilon = 0.406
Episode 190: Total Reward = -4.60, Epsilon = 0.386
Episode 200: Total Reward = 6.80, Epsilon

In [None]:
class ChaseEscapeEnv(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 30}

    def __init__(self, render_mode=None):
        super().__init__()

        self.dt = 0.1
        self.max_speed = 0.4
        self.agent_radius = 0.05
        self.target_radius = 0.05
        self.chaser_radius = 0.07
        self.chaser_speed = 0.03

        self.action_space = gym.spaces.MultiDiscrete([3, 3])  # actions in {0,1,2} map to [-1,0,1]
        self.observation_space = gym.spaces.Box(
            low=-1,
            high=1,
            shape=(8,),
            dtype=np.float32,
        )

        self.render_mode = render_mode
        self.screen_size = 500
        self.np_random = None

        if render_mode == "human":
            pygame.init()
            self.screen = pygame.display.set_mode((self.screen_size, self.screen_size))
            self.clock = pygame.time.Clock()

    def sample_pos(self, far_from=None, min_dist=0.5):
        while True:
            pos = self.np_random.uniform(low=-0.8, high=0.8, size=(2,))
            if far_from is None or np.linalg.norm(pos - far_from) >= min_dist:
                return pos

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        self.agent_pos = self.sample_pos()
        self.agent_vel = np.zeros(2, dtype=np.float32)
        self.target_pos = self.sample_pos(far_from=self.agent_pos, min_dist=0.5)
        self.chaser_pos = self.sample_pos(far_from=self.agent_pos, min_dist=0.7)

        return self._get_obs(), {}

    def _get_obs(self):
        # TODO: Decide how to pass the state (don't use pixel values)
        pass

    def _get_info(self):
        return {}

    def step(self, action):
        # TODO: Add reward scheme
        # 1) Try to make the agent stay within bounds
        # 2) The agent shouldn't idle around
        # 3) The agent should go for the reward
        # 4) The agent should avoid the chaser

        accel = (np.array(action) - 1) * 0.1
        self.agent_vel += accel
        self.agent_vel = np.clip(self.agent_vel, -self.max_speed, self.max_speed)
        self.agent_pos += self.agent_vel * self.dt
        self.agent_pos = np.clip(self.agent_pos, -1, 1)

        direction = self.agent_pos - self.chaser_pos
        norm = np.linalg.norm(direction)
        if norm > 1e-5:
            self.chaser_pos += self.chaser_speed * direction / norm

        dist_to_target = np.linalg.norm(self.agent_pos - self.target_pos)
        dist_to_chaser = np.linalg.norm(self.agent_pos - self.chaser_pos)

        reward = 0.0
        terminated = False

        if dist_to_target < self.agent_radius + self.target_radius:
            self.target_pos = self.sample_pos(far_from=self.agent_pos, min_dist=0.5)

        if dist_to_chaser < self.agent_radius + self.chaser_radius:
            terminated = True

        return self._get_obs(), reward, terminated, False, self._get_info()

    def render(self):
        if self.render_mode != "human":
            return

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.close()

        self.screen.fill((255, 255, 255))

        def to_screen(p):
            x = int((p[0] + 1) / 2 * self.screen_size)
            y = int((1 - (p[1] + 1) / 2) * self.screen_size)
            return x, y

        pygame.draw.circle(self.screen, (0, 255, 0), to_screen(self.target_pos), int(self.target_radius * self.screen_size))
        pygame.draw.circle(self.screen, (0, 0, 255), to_screen(self.agent_pos), int(self.agent_radius * self.screen_size))
        pygame.draw.circle(self.screen, (255, 0, 0), to_screen(self.chaser_pos), int(self.chaser_radius * self.screen_size))

        pygame.display.flip()
        self.clock.tick(self.metadata["render_fps"])

    def close(self):
        if self.render_mode == "human":
            pygame.quit()


In [None]:
# TODO: Train and evaluate CatMouseEnv