In [5]:
# Imports
import torch
import torch.nn as nn 
import numpy as np
import gymnasium as gym
from collections import deque
import pygame
import random

In [6]:
# DQN model which takes in the state as an input and outputs predicted q values for every possible action
class DQN(torch.nn.Module):
    def __init__(self, state_space, action_space, lr = 0.003):
        super().__init__()
        # Add your architecture parameters here
        # You can use nn.Functional
        # Remember that the input is of size batch_size x state_space
        # and the output is of size batch_size x action_space (ulta ho sakta hai dekh lo)
        # TODO: Add code here
        self.hidden_state = 128 
        self.state_space = state_space 
        self.action_space = action_space
        self.fc1 = nn.Linear(state_space,self.hidden_state)
        self.fc2 = nn.Linear(self.hidden_state,self.hidden_state)
        self.fc3 = nn.Linear(self.hidden_state,action_space)
        self.relu = nn.ReLU()
        self.optimizer = torch.optim.Adam(self.parameters(),lr=lr)
        self.loss = nn.MSELoss()

    def forward(self, input):
        # TODO: Complete based on your implementation
        a1 = self.relu(self.fc1(input))
        a2 = self.relu(self.fc2(a1))
        a3 = self.fc3(a2)
        return a3 

In [7]:
# While training neural networks, we split the data into batches.
# To improve the training, we need to remove the "correlation" between game states
# The buffer starts storing states and once it reaches maximum capacity, it replaces
# states at random which reduces the correlation.
class ExperienceBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return states, actions, rewards, next_states, dones
    
    def __len__(self):
        return len(self.buffer)


In [9]:
# TODO: Implement training logic for CartPole environment here
# Remember to use the ExperienceBuffer and a target network
# Details can be found in the book sent in the group
class train_cartpole_model : 
    def __init__ (self, gamma, epsilon, lr, input_dims, batch_size, n_actions, eps_dec = 5e-4, eps_end = 1e-1, capacity = 1000) : 
        self.gamma = gamma 
        self.epsilon = epsilon 
        self.eps_dec = eps_dec 
        self.eps_end = eps_end
        self.lr = lr 
        self.capacity = capacity
        self.action_space = [i for i in range(n_actions)]
        self.batch_size = batch_size 
        self.cnt = 0 

        self.deepQ = DQN(input_dims,n_actions,self.lr)
        self.buffer = ExperienceBuffer(self.capacity) 

    def take_action(self,observation) : 
        # Observation contains the state of a game in a batch for training 
        if np.random.uniform(0,1) < self.epsilon : 
            action = np.random.choice(self.action_space) 
        else : 
            observation = torch.tensor([observation])
            action_space = self.deepQ.forward(observation)
            action_space = action_space.detach().numpy()
            action = np.argmax(action_space)
        self.cnt += 1 
        return action 


    def train_model(self) : 
        if self.cnt < self.capacity : 
            return 
        self.deepQ.optimizer.zero_grad() 
        # Select a batch and train on that 
        states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size)
        states      = torch.tensor(states,      dtype=torch.float32)
        next_states = torch.tensor(next_states, dtype=torch.float32)
        actions     = torch.tensor(actions,     dtype=torch.int64)   
        rewards     = torch.tensor(rewards,     dtype=torch.float32)
        dones       = torch.tensor(dones,       dtype=torch.bool)
    
        q_eval_all = self.deepQ(states)   # Give the state-action matrix only for the relevant actions  
        q_eval = q_eval_all.gather(1, actions.unsqueeze(1)).squeeze(1)
        with torch.no_grad():
            q_next_all = self.deepQ(next_states)           # shape: (batch, n_actions)
            q_next_max = q_next_all.max(dim=1).values      # shape: (batch,)
            q_next_max[dones] = 0                          # zero for terminal states

        q_target = rewards + self.gamma * q_next_max
        loss = self.deepQ.loss(q_target,q_eval) 
        loss.backward() 
        self.deepQ.optimizer.step() 
        self.epsilon = (self.epsilon*self.eps_dec if self.epsilon > self.eps_end else self.eps_end)

# Train the model 
def train() : 
    env = gym.make("CartPole-v1", render_mode = "human")
    n_actions = env.action_space.n 
    obs_dim = env.observation_space.shape[0]
    model = train_cartpole_model(0.99,0.05,0.003,obs_dim,128,n_actions) 
    rewards = [] 
    episodes = 130

    for i in range(episodes) : 
        score = 0 
        done = False 
        observation = env.reset()[0]
        while not done : 
            action = model.take_action(observation) 
            next_state, reward, done, _ , _  = env.step(action)
            score += reward 
            model.buffer.push(observation,action,reward,next_state,done) 
            model.train_model() 
            observation = next_state
 
        rewards.append(score)

        print(f" The score after the {i}th episode is {np.mean(rewards[-100:])} ")
    return model.deepQ 

model = train() 

 The score after the 0th episode is 9.0 
 The score after the 1th episode is 9.0 
 The score after the 2th episode is 9.333333333333334 
 The score after the 3th episode is 9.0 
 The score after the 4th episode is 9.2 
 The score after the 5th episode is 9.0 
 The score after the 6th episode is 9.0 
 The score after the 7th episode is 9.125 
 The score after the 8th episode is 9.0 
 The score after the 9th episode is 9.3 
 The score after the 10th episode is 9.363636363636363 
 The score after the 11th episode is 9.25 
 The score after the 12th episode is 9.23076923076923 
 The score after the 13th episode is 9.214285714285714 
 The score after the 14th episode is 9.266666666666667 
 The score after the 15th episode is 9.25 
 The score after the 16th episode is 9.294117647058824 
 The score after the 17th episode is 9.277777777777779 
 The score after the 18th episode is 9.368421052631579 
 The score after the 19th episode is 9.45 
 The score after the 20th episode is 9.428571428571429

In [10]:
def evaluate_cartpole_model(model, episodes=10, render=True):
    env = gym.make("CartPole-v1", render_mode="human" if render else None)
    obs_dim = env.observation_space.shape[0]
    n_actions = env.action_space.n

    model.eval()

    rewards = []

    for episode in range(episodes):
        obs, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            state = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
            with torch.no_grad():
                q_values = model(state)
                action = torch.argmax(q_values, dim=1).item()

            obs, reward, done, _, _ = env.step(action)
            total_reward += reward

            if render:
                env.render()

        rewards.append(total_reward)
        print(f"Episode {episode + 1}: Reward = {total_reward}")

    env.close()
    avg_reward = sum(rewards) / episodes
    print(f"Average reward over {episodes} episodes: {avg_reward}")

In [11]:
# TODO: Run evaluation for cartpole here
evaluate_cartpole_model(model)

Episode 1: Reward = 106.0
Episode 2: Reward = 109.0
Episode 3: Reward = 115.0
Episode 4: Reward = 109.0
Episode 5: Reward = 111.0
Episode 6: Reward = 114.0
Episode 7: Reward = 112.0
Episode 8: Reward = 115.0
Episode 9: Reward = 114.0
Episode 10: Reward = 112.0
Average reward over 10 episodes: 111.7


In [13]:
class SnakeGame(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 10}

    def __init__(self, size=10, render_mode=None):
        super().__init__()
        self.size = size
        self.cell_size = 30
        self.screen_size = self.size * self.cell_size
        self.render_mode = render_mode

        self.action_space = gym.spaces.Discrete(4)  # 0: right, 1: up, 2: left, 3: down
        self.observation_space = gym.spaces.Box(0, 2, shape=(self.size, self.size), dtype=np.uint8)

        self.screen = None
        self.clock = None

        self.snake = deque()
        self.food = None
        self.direction = [1, 0]

        if self.render_mode == "human":
            pygame.init()
            self.screen = pygame.display.set_mode((self.screen_size, self.screen_size))
            self.clock = pygame.time.Clock()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.snake.clear()
        mid = self.size // 2
        self.snake.appendleft([mid, mid])
        self.direction = [1, 0]
        self._place_food()
        obs = self._get_obs()

        if self.render_mode == "human":
            self._render_init()

        return obs, {}

    def step(self, action):
        # 1) detect invalid 180° reverse attempts
        if (action == 0 and self.direction == [-1, 0]) or \
           (action == 2 and self.direction == [ 1, 0]) or \
           (action == 1 and self.direction == [ 0, 1]) or \
           (action == 3 and self.direction == [ 0,-1]):
            reward = -0.5
        else:
            reward = -0.01
            # apply valid direction change:
            if action == 0: self.direction = [1, 0]
            elif action == 1: self.direction = [0,-1]
            elif action == 2: self.direction = [-1,0]
            elif action == 3: self.direction = [0, 1]
        
        head = self.snake[0]
        new_head = [head[0] + self.direction[0], head[1] + self.direction[1]]
        
        terminated = False
        
        # 2) wall‐collision
        if not (0 <= new_head[0] < self.size and 0 <= new_head[1] < self.size):
            terminated = True
            reward = -1.0
        else:
            # 3) self‐collision
            body = list(self.snake)[:-1] if new_head != self.food else list(self.snake)
            if new_head in body:
                terminated = True
                reward = -1.0
        
        # 4) advance snake (unless dead)
        if not terminated:
            self.snake.appendleft(new_head)
            if new_head == self.food:
                reward = +1.0                # eating food
                self._place_food()
            else:
                self.snake.pop()
        
        obs = self._get_obs()
        # truncated is always False for now
        return obs, reward, terminated, False, {}

    def _get_obs(self):
        board = np.zeros((self.size, self.size), dtype=np.uint8)
        # mark snake
        for x, y in self.snake:
            board[y, x] = 1
        # mark food
        if self.food is not None:
            fx, fy = self.food
            board[fy, fx] = 2
        return board

    def _place_food(self):
        positions = set(tuple(p) for p in self.snake)
        empty = [(x, y) for x in range(self.size) for y in range(self.size) if (x, y) not in positions]
        self.food = list(random.choice(empty)) if empty else None

    def render(self):
        if self.screen is None:
            self._render_init()

        self.screen.fill((0, 0, 0))
        for x, y in self.snake:
            pygame.draw.rect(
                self.screen, (0, 255, 0),
                pygame.Rect(x * self.cell_size, y * self.cell_size, self.cell_size, self.cell_size)
            )
        if self.food:
            fx, fy = self.food
            pygame.draw.rect(
                self.screen, (255, 0, 0),
                pygame.Rect(fx * self.cell_size, fy * self.cell_size, self.cell_size, self.cell_size)
            )

        pygame.display.flip()
        self.clock.tick(self.metadata["render_fps"])

    def _render_init(self):
        pygame.init()
        self.screen = pygame.display.set_mode((self.size * self.cell_size, self.size * self.cell_size))
        self.clock = pygame.time.Clock()

    def close(self):
        if self.screen:
            pygame.quit()
            self.screen = None

In [15]:
# TODO: Implement training logic for Snake Game here
import torch.nn.functional as F

class DQN(nn.Module):
    def __init__(self, input_dim, n_actions):
        super().__init__()
        self.net = nn.Sequential(
            nn.Flatten(),
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions)
        )
    def forward(self, x):
        return self.net(x)

class ReplayBuffer:
    def __init__(self, capacity):
        self.buf = deque(maxlen=capacity)
    def push(self, s,a,r,s2,done):
        self.buf.append((s,a,r,s2,done))
    def sample(self, bs):
        batch = random.sample(self.buf, bs)
        ss, aa, rr, ss2, dd = zip(*batch)
        return (
            torch.tensor(ss, dtype=torch.float32),
            torch.tensor(aa, dtype=torch.int64),
            torch.tensor(rr, dtype=torch.float32),
            torch.tensor(ss2, dtype=torch.float32),
            torch.tensor(dd, dtype=torch.bool)
        )
    def __len__(self): return len(self.buf)

class SnakeAgent:
    def __init__(self, env, device='cpu'):
        self.env = env
        obs_shape = env.observation_space.shape  # (H,W)
        input_dim = obs_shape[0]*obs_shape[1]
        n_actions = env.action_space.n
        self.online_net = DQN(input_dim, n_actions).to(device)
        self.target_net = DQN(input_dim, n_actions).to(device)
        self.target_net.load_state_dict(self.online_net.state_dict())
        self.optimizer = torch.optim.Adam(self.online_net.parameters(), lr=1e-3)
        self.replay = ReplayBuffer(10000)
        self.batch_size = 64
        self.gamma = 0.99
        self.eps, self.eps_end, self.eps_dec = 1.0, 0.1, 1e-4
        self.step_count = 0
        self.target_update = 1000
        self.device = device

    def select_action(self, state):
        if random.random() < self.eps:
            return self.env.action_space.sample()
        with torch.no_grad():
            st_v = torch.tensor(state[None,:], dtype=torch.float32).to(self.device)
            q_vals = self.online_net(st_v)
            return int(q_vals.argmax(dim=1).item())

    def train_step(self):
        if len(self.replay) < self.batch_size:
            return
        s, a, r, s2, d = self.replay.sample(self.batch_size)
        s  = s.to(self.device)
        s2 = s2.to(self.device)
        a  = a.to(self.device)
        r  = r.to(self.device)
        d  = d.to(self.device)

        q = self.online_net(s)
        q_s_a = q.gather(1, a.unsqueeze(1)).squeeze(1)

        with torch.no_grad():
            q_next = self.target_net(s2)
            q_next_max = q_next.max(dim=1).values
            q_next_max[d] = 0.0
        q_target = r + self.gamma * q_next_max

        loss = F.mse_loss(q_s_a, q_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # eps decay
        self.eps = max(self.eps_end, self.eps - self.eps_dec)
        # target update
        if self.step_count % self.target_update == 0:
            self.target_net.load_state_dict(self.online_net.state_dict())

    def train(self, episodes=500):
        for ep in range(1, episodes+1):
            state, _ = self.env.reset()
            total_r = 0.0
            done = False
            while not done:
                action = self.select_action(state)
                nxt, reward, terminated, truncated, _ = self.env.step(action)
                done = terminated or truncated
                self.replay.push(state, action, reward, nxt, done)
                self.train_step()
                state = nxt
                total_r += reward
                self.step_count += 1
            print(f"Episode {ep}: Reward={total_r:.2f}, eps={self.eps:.3f}")

if __name__ == "__main__":
    env    = SnakeGame(size=10, render_mode=None)
    agent  = SnakeAgent(env, device='cuda' if torch.cuda.is_available() else 'cpu')
    agent.train(episodes=1000)


Episode 1: Reward=-1.18, eps=1.000
Episode 2: Reward=-1.56, eps=1.000
Episode 3: Reward=-2.61, eps=1.000
Episode 4: Reward=-2.09, eps=1.000
Episode 5: Reward=-1.58, eps=1.000
Episode 6: Reward=-2.67, eps=0.998
Episode 7: Reward=-2.09, eps=0.996
Episode 8: Reward=-3.09, eps=0.995
Episode 9: Reward=-3.61, eps=0.993
Episode 10: Reward=-2.61, eps=0.991
Episode 11: Reward=-3.64, eps=0.989
Episode 12: Reward=-2.52, eps=0.989
Episode 13: Reward=-1.58, eps=0.988
Episode 14: Reward=-2.53, eps=0.987
Episode 15: Reward=-2.06, eps=0.986
Episode 16: Reward=-1.05, eps=0.985
Episode 17: Reward=-2.06, eps=0.984
Episode 18: Reward=-4.63, eps=0.982
Episode 19: Reward=-1.14, eps=0.981
Episode 20: Reward=-5.21, eps=0.977
Episode 21: Reward=-1.54, eps=0.977
Episode 22: Reward=-2.55, eps=0.976
Episode 23: Reward=-2.57, eps=0.975
Episode 24: Reward=-3.13, eps=0.973
Episode 25: Reward=-2.64, eps=0.971
Episode 26: Reward=-3.71, eps=0.968
Episode 27: Reward=-3.61, eps=0.966
Episode 28: Reward=-2.05, eps=0.966
E

In [17]:
def evaluate_snake_model(model, size=20, episodes=10, render=True):
    env = SnakeGame(size=size, render_mode="human" if render else None)
    model.eval()

    rewards = []

    for episode in range(episodes):
        obs, _ = env.reset()
        total_reward = 0
        done = False

        while not done:
            state = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
            with torch.no_grad():
                q_values = model(state)
                action = torch.argmax(q_values, dim=1).item()

            obs, reward, done, _, _ = env.step(action)
            total_reward += reward

            if render:
                env.render()
    
        rewards.append(total_reward)
        print(f"Episode {episode + 1}: Reward = {total_reward}")

    env.close()
    avg_reward = sum(rewards) / episodes

    print(f"Average reward over {episodes} episodes: {avg_reward}")

In [18]:
# TODO: Run evaluation for Snake Game here
evaluate_snake_model(agent.online_net, size=20, episodes=10, render=True)

RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x400 and 100x128)

In [None]:
class ChaseEscapeEnv(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 30}

    def __init__(self, render_mode=None):
        super().__init__()

        self.dt = 0.1
        self.max_speed = 0.4
        self.agent_radius = 0.05
        self.target_radius = 0.05
        self.chaser_radius = 0.07
        self.chaser_speed = 0.03

        self.action_space = gym.spaces.MultiDiscrete([3, 3])  # actions in {0,1,2} map to [-1,0,1]
        self.observation_space = gym.spaces.Box(
            low=-1,
            high=1,
            shape=(8,),
            dtype=np.float32,
        )

        self.render_mode = render_mode
        self.screen_size = 500
        self.np_random = None

        if render_mode == "human":
            pygame.init()
            self.screen = pygame.display.set_mode((self.screen_size, self.screen_size))
            self.clock = pygame.time.Clock()

    def sample_pos(self, far_from=None, min_dist=0.5):
        while True:
            pos = self.np_random.uniform(low=-0.8, high=0.8, size=(2,))
            if far_from is None or np.linalg.norm(pos - far_from) >= min_dist:
                return pos

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        self.agent_pos = self.sample_pos()
        self.agent_vel = np.zeros(2, dtype=np.float32)
        self.target_pos = self.sample_pos(far_from=self.agent_pos, min_dist=0.5)
        self.chaser_pos = self.sample_pos(far_from=self.agent_pos, min_dist=0.7)

        return self._get_obs(), {}

    def _get_obs(self):
        # TODO: Decide how to pass the state (don't use pixel values)
        pass

    def _get_info(self):
        return {}

    def step(self, action):
        # TODO: Add reward scheme
        # 1) Try to make the agent stay within bounds
        # 2) The agent shouldn't idle around
        # 3) The agent should go for the reward
        # 4) The agent should avoid the chaser

        accel = (np.array(action) - 1) * 0.1
        self.agent_vel += accel
        self.agent_vel = np.clip(self.agent_vel, -self.max_speed, self.max_speed)
        self.agent_pos += self.agent_vel * self.dt
        self.agent_pos = np.clip(self.agent_pos, -1, 1)

        direction = self.agent_pos - self.chaser_pos
        norm = np.linalg.norm(direction)
        if norm > 1e-5:
            self.chaser_pos += self.chaser_speed * direction / norm

        dist_to_target = np.linalg.norm(self.agent_pos - self.target_pos)
        dist_to_chaser = np.linalg.norm(self.agent_pos - self.chaser_pos)

        reward = 0.0
        terminated = False

        if dist_to_target < self.agent_radius + self.target_radius:
            self.target_pos = self.sample_pos(far_from=self.agent_pos, min_dist=0.5)

        if dist_to_chaser < self.agent_radius + self.chaser_radius:
            terminated = True

        return self._get_obs(), reward, terminated, False, self._get_info()

    def render(self):
        if self.render_mode != "human":
            return

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.close()

        self.screen.fill((255, 255, 255))

        def to_screen(p):
            x = int((p[0] + 1) / 2 * self.screen_size)
            y = int((1 - (p[1] + 1) / 2) * self.screen_size)
            return x, y

        pygame.draw.circle(self.screen, (0, 255, 0), to_screen(self.target_pos), int(self.target_radius * self.screen_size))
        pygame.draw.circle(self.screen, (0, 0, 255), to_screen(self.agent_pos), int(self.agent_radius * self.screen_size))
        pygame.draw.circle(self.screen, (255, 0, 0), to_screen(self.chaser_pos), int(self.chaser_radius * self.screen_size))

        pygame.display.flip()
        self.clock.tick(self.metadata["render_fps"])

    def close(self):
        if self.render_mode == "human":
            pygame.quit()


In [None]:
# TODO: Train and evaluate CatMouseEnv