In [None]:
# Imports
import torch
import numpy as np
import gymnasium as gym
from collections import deque
import pygame
import random
import torch.nn as nn
import torch.nn.functional as F

In [None]:
# DQN model which takes in the state as an input and outputs predicted q values for every possible action
class DQN(torch.nn.Module):
    def __init__(self, state_space, action_space):
        super().__init__()
        # Add your architecture parameters here
        # You can use nn.Functional
        # Remember that the input is of size batch_size x state_space
        # and the output is of size batch_size x action_space (ulta ho sakta hai dekh lo)
        # TODO: Add code here
        self.fc1 = nn.Linear(state_space, 128)
        self.fc2 = nn.Linear(128, 128)
        self.out = nn.Linear(128, action_space)

    def forward(self, input):
        # TODO: Complete based on your implementation
        x = self.fc1(input)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        x = self.out(x)
        return x

In [None]:
# While training neural networks, we split the data into batches.
# To improve the training, we need to remove the "correlation" between game states
# The buffer starts storing states and once it reaches maximum capacity, it replaces
# states at random which reduces the correlation.
class ExperienceBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return states, actions, rewards, next_states, dones
    
    def __len__(self):
        return len(self.buffer)


In [None]:
# TODO: Implement training logic for CartPole environment here
# Remember to use the ExperienceBuffer and a target network
# Details can be found in the book sent in the group
env = gym.make("CartPole-v1")
state_space = env.observation_space.shape[0]
action_space = env.action_space.n
policy_net = DQN(state_space, action_space)
target_net = DQN(state_space, action_space)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()
optimizer = torch.optim.Adam(policy_net.parameters(), lr = 1e-3)
criterion = torch.nn.MSELoss()
buffer = ExperienceBuffer(capacity=10000)
batch_size = 64
gamma = 0.99
target_update_freq = 10
epsilon_start = 1.0
epsilon_end = 0.05  # was too high before
epsilon_decay = 300  # faster decay helps earlier learning
num_episodes = 500

def get_epsilon(episode):
    return epsilon_end + (epsilon_start - epsilon_end) * np.exp(-1. * episode / epsilon_decay)

for ep in range(num_episodes):
    state, _ = env.reset()
    total_reward = 0
    done = False
    epsilon = get_epsilon(ep)
    while not done:
        if random.random() < epsilon:
            action = env.action_space.sample()
        else:
            with torch.no_grad():
                state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
                q_values = policy_net(state_tensor)
                action = torch.argmax(q_values, dim=1).item()

        next_state, reward, done, _, _ = env.step(action)
        buffer.push(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

        if len(buffer) >= batch_size:
            states, actions, rewards, next_states, dones = buffer.sample(batch_size)
            states = torch.tensor(np.array(states), dtype=torch.float32)
            actions = torch.tensor(actions, dtype=torch.int64).unsqueeze(1)
            rewards = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1)
            next_states = torch.tensor(np.array(next_states), dtype=torch.float32)
            dones = torch.tensor(dones, dtype=torch.float32).unsqueeze(1)

            q_values = policy_net(states).gather(1, actions)
            with torch.no_grad():
                next_q_values = target_net(next_states).max(1, keepdim=True)[0]
                target_q_values = rewards + gamma * next_q_values * (1 - dones)

            loss = criterion(q_values, target_q_values)
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(policy_net.parameters(), max_norm=1.0)
            optimizer.step()

    if ep % target_update_freq == 0:
        target_net.load_state_dict(policy_net.state_dict())
    print(f"Episode {ep}, Total Reward: {total_reward}")


In [None]:
def evaluate_cartpole_model(model, episodes=10, render=True):
    env = gym.make("CartPole-v1", render_mode="human" if render else None)
    obs_dim = env.observation_space.shape[0]
    n_actions = env.action_space.n

    model.eval()

    rewards = []

    for episode in range(episodes):
        obs, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            state = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
            with torch.no_grad():
                q_values = model(state)
                action = torch.argmax(q_values, dim=1).item()

            obs, reward, done, _, _ = env.step(action)
            total_reward += reward

            if render:
                env.render()

        rewards.append(total_reward)
        print(f"Episode {episode + 1}: Reward = {total_reward}")

    env.close()
    avg_reward = sum(rewards) / episodes
    print(f"Average reward over {episodes} episodes: {avg_reward}")

In [None]:
# TODO: Run evaluation for cartpole here
evaluate_cartpole_model(policy_net, episodes=10, render=True)

In [None]:
class SnakeGame(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 10}

    def __init__(self, size=10, render_mode=None):
        super().__init__()
        self.size = size
        self.cell_size = 30
        self.screen_size = self.size * self.cell_size
        self.render_mode = render_mode

        self.action_space = gym.spaces.Discrete(4)  # 0: right, 1: up, 2: left, 3: down
        self.observation_space = gym.spaces.Box(0, 1, shape=(3, self.size, self.size), dtype=np.float32)

        self.screen = None
        self.clock = None

        self.snake = deque()
        self.food = None
        self.direction = [1, 0]

        if self.render_mode == "human":
            pygame.init()
            self.screen = pygame.display.set_mode((self.screen_size, self.screen_size))
            self.clock = pygame.time.Clock()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.snake.clear()
        mid = self.size // 2
        self.snake.appendleft([mid, mid])
        self.direction = [1, 0]
        self._place_food()
        self.steps_since_last_food = 0

        if self.render_mode == "human":
            self._render_init()

        return self._get_obs(), {}

    def step(self, action):
        reward = 0
        info = {}
        self.steps_since_last_food += 1
        old_direction = self.direction.copy()

        # --- Direction update ---
        intended = {
            0: [1, 0],
            1: [0, -1],
            2: [-1, 0],
            3: [0, 1],
        }[action]

        if intended == [-d for d in old_direction]:
            reward -= 0.5  # stronger penalty for 180° turns
        else:
            self.direction = intended

        head = self.snake[0]
        new_head = [head[0] + self.direction[0], head[1] + self.direction[1]]

        done = False

        # --- Wall collision ---
        if not (0 <= new_head[0] < self.size and 0 <= new_head[1] < self.size):
            reward -= 10
            done = True
        else:
            # Use set for O(1) lookup
            body_set = set(map(tuple, self.snake))
            if tuple(new_head) in body_set and new_head != list(self.snake)[-1]:
                reward -= 10
                done = True

        if not done:
            self.snake.appendleft(new_head)

            # --- Ate food ---
            if new_head == self.food:
                base = 10
                time_bonus = max(15, 25 - 0.1 * self.steps_since_last_food)
                reward += base + time_bonus
                self.steps_since_last_food = 0
                self._place_food()
                info["food_eaten"] = True
            else:
                self.snake.pop()
                info["food_eaten"] = False

            # --- Moving closer or farther ---
            dist_prev = abs(head[0] - self.food[0]) + abs(head[1] - self.food[1])
            dist_new = abs(new_head[0] - self.food[0]) + abs(new_head[1] - self.food[1])
            if dist_new < dist_prev:
                reward += 0.3
            elif dist_new > dist_prev:
                reward -= 0.3
        else:
            info["food_eaten"] = False
            self.steps_since_last_food = 0  # reset counter

        obs = self._get_obs()
        if self.render_mode == "human":
            self.render()

        return obs, reward, done, False, info

    def _get_obs(self):
        head_grid = np.zeros((self.size, self.size), dtype=np.float32)
        body_grid = np.zeros((self.size, self.size), dtype=np.float32)
        food_grid = np.zeros((self.size, self.size), dtype=np.float32)

        for part in list(self.snake)[1:]:
            body_grid[part[0], part[1]] = 1.0
        head_x, head_y = self.snake[0]
        head_grid[head_x, head_y] = 1.0
        if self.food:
            food_grid[self.food[0], self.food[1]] = 1.0

        stacked = np.stack([head_grid, body_grid, food_grid], axis=0)
        return stacked.astype(np.float32)

    def _place_food(self):
        positions = set(tuple(p) for p in self.snake)
        empty = [(x, y) for x in range(self.size) for y in range(self.size) if (x, y) not in positions]
        if empty:
            self.food = list(random.choice(empty))
        else:
            self.food = None  # grid full (shouldn't usually happen)

    def render(self):
        if self.screen is None:
            self._render_init()

        self.screen.fill((0, 0, 0))
        for x, y in self.snake:
            pygame.draw.rect(
                self.screen, (0, 255, 0),
                pygame.Rect(x * self.cell_size, y * self.cell_size, self.cell_size, self.cell_size)
            )
        if self.food:
            fx, fy = self.food
            pygame.draw.rect(
                self.screen, (255, 0, 0),
                pygame.Rect(fx * self.cell_size, fy * self.cell_size, self.cell_size, self.cell_size)
            )

        pygame.display.flip()
        self.clock.tick(self.metadata["render_fps"])

    def _render_init(self):
        pygame.init()
        self.screen = pygame.display.set_mode((self.size * self.cell_size, self.size * self.cell_size))
        self.clock = pygame.time.Clock()

    def close(self):
        if self.screen:
            pygame.quit()
            self.screen = None


In [None]:
# Training loop for Snake Game
env = SnakeGame(render_mode=None)
memory = deque(maxlen=10000)
batch_size = 64
gamma = 0.99
learning_rate = 1e-3
target_update_freq = 10
num_episodes = 500
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

state_space = np.prod(env.observation_space.shape)
policy_net = DQN(state_space, env.action_space.n).to(device)
target_net = DQN(state_space, env.action_space.n).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = torch.optim.Adam(policy_net.parameters(), lr=learning_rate)
replay_buffer = ExperienceBuffer(capacity=10000)
epsilon = 1.0
epsilon_decay = 0.995  # slower decay
epsilon_min = 0.05     # exploration retained longer
max_steps_per_episode = 500  # allow longer runs if improving

for episode in range(num_episodes):
    state, _ = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0).view(1, -1)
    total_reward = 0
    steps = 0
    food_collected = 0
    done = False

    while not done and steps < max_steps_per_episode:
        if random.random() < epsilon:
            action = env.action_space.sample()
        else:
            with torch.no_grad():
                q_values = policy_net(state)
                action = q_values.argmax().item()

        next_state, reward, done, _, info = env.step(action)

        # Encourage survival slightly and penalize dying
        if done and reward == 0:
            reward = -0.1
        else:
            reward += 0.01  # small bonus for being alive

        # Count food collected based on info dict (assuming 'food_eaten' flag is set in env)
        if info.get("food_eaten", False):
            food_collected += 1
            reward += 1.0  # ensure food reward is significant

        next_state_tensor = torch.tensor(next_state, dtype=torch.float32, device=device).unsqueeze(0).view(1, -1)
        replay_buffer.push(state, action, reward, next_state_tensor, done)
        state = next_state_tensor
        total_reward += reward
        steps += 1

        if len(replay_buffer) >= batch_size:
            states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
            states = torch.cat(states).view(batch_size, -1)
            next_states = torch.cat(next_states).view(batch_size, -1)
            actions = torch.tensor(actions, device=device).unsqueeze(1)
            rewards = torch.tensor(rewards, dtype=torch.float32, device=device).unsqueeze(1)
            dones = torch.tensor(dones, dtype=torch.float32, device=device).unsqueeze(1)

            curr_q_values = policy_net(states).gather(1, actions)
            next_q_values = target_net(next_states).max(1)[0].detach().unsqueeze(1)
            expected_q_values = rewards + gamma * next_q_values * (1 - dones)

            loss = F.mse_loss(curr_q_values, expected_q_values)
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(policy_net.parameters(), max_norm=1.0)
            optimizer.step()

    # Decay epsilon once per episode
    epsilon = max(epsilon_min, epsilon * epsilon_decay)

    # Update target network
    if episode % target_update_freq == 0:
        target_net.load_state_dict(policy_net.state_dict())

    print(f"Episode {episode + 1}, Total Reward: {total_reward:.2f}, Epsilon: {epsilon:.3f}, Steps: {steps}, Food: {food_collected}")


In [None]:
def evaluate_snake_model(model, size=20, episodes=10, render=True):
    env = SnakeGame(size=size, render_mode="human" if render else None)
    model.eval()

    rewards = []
    foods = []

    for episode in range(episodes):
        obs, _ = env.reset()
        total_reward = 0
        food_collected = 0
        done = False
        max_steps = 500
        steps = 0
        while not done and steps < max_steps:
            steps += 1
            obs_resized = F.interpolate(torch.tensor(obs, dtype=torch.float32).unsqueeze(0), size=(10, 10), mode='bilinear')
            state = obs_resized.view(1, -1)[:, :300]
            with torch.no_grad():
                q_values = model(state)
                action = torch.argmax(q_values, dim=1).item()

            obs, reward, done, _, info = env.step(action)
            total_reward += reward

            # ✅ Count food collected if info dict has that flag
            if isinstance(info, dict) and info.get("food_eaten", False):
                food_collected += 1

            if render:
                env.render()

        rewards.append(total_reward)
        foods.append(food_collected)
        print(f"Episode {episode + 1}: Reward = {total_reward}, Food Collected = {food_collected}")

    env.close()
    avg_reward = sum(rewards) / episodes
    avg_food = sum(foods) / episodes

    print(f"\nAverage reward over {episodes} episodes: {avg_reward:.2f}")
    print(f"Average food collected: {avg_food:.2f}")


In [None]:
# TODO: Run evaluation for Snake Game here
evaluate_snake_model(policy_net, size=20, episodes=10, render=False)

In [None]:
class ChaseEscapeEnv(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 30}

    def __init__(self, render_mode=None):
        super().__init__()

        self.dt = 0.1
        self.max_speed = 0.4
        self.agent_radius = 0.05
        self.target_radius = 0.05
        self.chaser_radius = 0.05
        self.chaser_speed = 0.03

        self.action_space = gym.spaces.MultiDiscrete([3, 3])  # actions in {0,1,2} map to [-1,0,1]
        self.observation_space = gym.spaces.Box(
            low=-1,
            high=1,
            shape=(8,),
            dtype=np.float32,
        )

        self.render_mode = render_mode
        self.screen_size = 500
        self.np_random = None

        if render_mode == "human":
            pygame.init()
            self.screen = pygame.display.set_mode((self.screen_size, self.screen_size))
            self.clock = pygame.time.Clock()

    def sample_pos(self, far_from=None, min_dist=0.5):
        while True:
            pos = self.np_random.uniform(low=-0.8, high=0.8, size=(2,))
            if far_from is None or np.linalg.norm(pos - far_from) >= min_dist:
                return pos

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        self.agent_pos = self.sample_pos()
        self.agent_vel = np.zeros(2, dtype=np.float32)
        self.target_pos = self.sample_pos(far_from=self.agent_pos, min_dist=0.5)
        self.chaser_pos = self.sample_pos(far_from=self.agent_pos, min_dist=0.7)

        return self._get_obs(), {}

    def _get_obs(self):
        # TODO: Decide how to pass the state (don't use pixel values)
        agent_x, agent_y = self.agent_pos
        agent_vx, agent_vy = self.agent_vel
        target_x, target_y = self.target_pos
        chaser_x, chaser_y = self.chaser_pos
        return np.array([agent_x, agent_y, agent_vx, agent_vy, target_x, target_y, chaser_x, chaser_y], dtype=np.float32)


    def _get_info(self):
        return {}

    def step(self, action):
        # TODO: Add reward scheme
        # 1) Try to make the agent stay within bounds
        # 2) The agent shouldn't idle around
        # 3) The agent should go for the reward
        # 4) The agent should avoid the chaser
        reward = 0.0
        prev_dist_to_target = np.linalg.norm(self.agent_pos - self.target_pos)
        accel = (np.array(action) - 1) * 0.1
        self.agent_vel += accel
        self.agent_vel = np.clip(self.agent_vel, -self.max_speed, self.max_speed)
        # Save intended new position before clipping
        new_pos = self.agent_pos.copy()
        new_pos += self.agent_vel * self.dt

        # Penalize if new position goes out of bounds
        if np.any(new_pos < -1) or np.any(new_pos > 1):
            reward -= 0.1

        # Apply clipping to stay within bounds
        self.agent_pos = np.clip(new_pos, -1, 1)

        if np.linalg.norm(self.agent_vel) < 0.01:
            reward -= 0.05  # or stronger penalty if needed

        direction = self.agent_pos - self.chaser_pos
        norm = np.linalg.norm(direction)
        if norm > 1e-5:
            self.chaser_pos += self.chaser_speed * direction / norm

        dist_to_target = np.linalg.norm(self.agent_pos - self.target_pos)
        dist_to_chaser = np.linalg.norm(self.agent_pos - self.chaser_pos)

        if dist_to_chaser < 0.5:
            if norm > 1e-5:
                projected_chaser_pos = self.chaser_pos - self.chaser_speed * direction / norm
                prev_dist_to_chaser = np.linalg.norm(self.agent_pos - projected_chaser_pos)
            else:
                prev_dist_to_chaser = dist_to_chaser

            delta_chaser = prev_dist_to_chaser - dist_to_chaser
            reward += delta_chaser * 0.3  # penalize moving toward chaser only when close

        delta = prev_dist_to_target - dist_to_target
        reward += delta * 0.5  # scale it to keep values small


        terminated = False

        info = {}
        if dist_to_target < self.agent_radius + self.target_radius:
            reward += 10.0
            self.target_pos = self.sample_pos(far_from=self.agent_pos, min_dist=0.5)
            info["target_captured"] = True

        if dist_to_target < 0.3:
            reward += 0.1

        if dist_to_chaser < self.agent_radius + self.chaser_radius:
            reward -= 3.0
            terminated = True
            info["caught_by_chaser"] = True

        return self._get_obs(), reward, terminated, False, info


    def render(self):
        if self.render_mode != "human":
            return

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.close()

        self.screen.fill((255, 255, 255))

        def to_screen(p):
            x = int((p[0] + 1) / 2 * self.screen_size)
            y = int((1 - (p[1] + 1) / 2) * self.screen_size)
            return x, y

        pygame.draw.circle(self.screen, (0, 255, 0), to_screen(self.target_pos), int(self.target_radius * self.screen_size))
        pygame.draw.circle(self.screen, (0, 0, 255), to_screen(self.agent_pos), int(self.agent_radius * self.screen_size))
        pygame.draw.circle(self.screen, (255, 0, 0), to_screen(self.chaser_pos), int(self.chaser_radius * self.screen_size))

        pygame.display.flip()
        self.clock.tick(self.metadata["render_fps"])

    def close(self):
        if self.render_mode == "human":
            pygame.quit()


In [None]:
# TODO: Train and evaluate CatMouseEnv
def select_action(state, model, epsilon, action_space):
    if random.random() < epsilon:
        return [random.randint(0, 2), random.randint(0, 2)]
    with torch.no_grad():
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        q_values = model(state_tensor)
        action = q_values.argmax(dim=1).item()
        return [action // 3, action % 3]  # Convert flat index to (x,y) action

env = ChaseEscapeEnv()
dqn = DQN(state_space=8, action_space=9)  # 3x3 = 9 actions
target_dqn = DQN(8, 9)
target_dqn.load_state_dict(dqn.state_dict())
optimizer = torch.optim.Adam(dqn.parameters(), lr=1e-3)
buffer = ExperienceBuffer(capacity=10000)

gamma = 0.99
batch_size = 64
epsilon = 1.0
min_epsilon = 0.05
epsilon_decay = 0.995
max_steps = 500

for episode in range(500):
    state, _ = env.reset()
    total_reward = 0
    steps = 0
    done = False
    steps = 0
    target_captures = 0
    caught_by_chaser = False

    while not done and steps < max_steps:
        steps += 1
        action = select_action(state, dqn, epsilon, env.action_space)
        flat_action = action[0] * 3 + action[1]
        next_state, reward, terminated, truncated, info = env.step(action)
        buffer.push(state, flat_action, reward, next_state, terminated)
        state = next_state
        total_reward += reward
        done = terminated

        # Track extra info
        if info.get("target_captured"):
            target_captures += 1
        if info.get("caught_by_chaser"):
            caught_by_chaser = True

        # Train if enough samples
        if len(buffer) > batch_size:
            states, actions, rewards, next_states, dones = buffer.sample(batch_size)
            states = torch.FloatTensor(states)
            actions = torch.LongTensor(actions)
            rewards = torch.FloatTensor(rewards)
            next_states = torch.FloatTensor(next_states)
            dones = torch.FloatTensor(dones)

            q_values = dqn(states)
            next_q_values = target_dqn(next_states)

            q_target = rewards + gamma * next_q_values.max(1)[0] * (1 - dones)
            q_expected = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)

            loss = F.mse_loss(q_expected, q_target)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    if epsilon > min_epsilon:
        epsilon *= epsilon_decay

    if episode % 10 == 0:
        target_dqn.load_state_dict(dqn.state_dict())

    # ✅ Clean, compact log per episode
    print(
        f"Ep {episode:03d} | "
        f"Reward: {total_reward:.2f} | "
        f"Steps: {steps} | "
        f"Captures: {target_captures} | "
        f"Caught: {caught_by_chaser}"
    )

# Evaluation phase
state, _ = env.reset()
done = False
total_reward = 0
while not done:
    env.render()
    action = select_action(state, dqn, epsilon=0.0, action_space=env.action_space)
    state, reward, terminated, _, _ = env.step(action)
    total_reward += reward
    done = terminated
    pygame.time.delay(50)

print(f"Total reward in evaluation: {total_reward:.2f}")
env.close()
