In [None]:
!pip install gym[atari] torch torchvision numpy opencv-python



In [None]:
pip install gym[accept-rom-license]


In [None]:
import gym
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import cv2

# Preprocessing function
def preprocess_frame(frame):
    # Check if frame is in the correct shape
    if frame.ndim == 3:  # RGB frame
        gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    else:
        gray = frame  # If it's already grayscale or in unexpected format

    resized = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_AREA)
    normalized = resized / 255.0
    return normalized

class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

def choose_action(state, epsilon, model):
    if random.random() < epsilon:
        return random.randint(0, 5)  # Random action
    with torch.no_grad():
        state = torch.FloatTensor(state).unsqueeze(0)
        return model(state).argmax().item()

def train_model(model, replay_buffer, optimizer, batch_size, gamma):
    if len(replay_buffer.buffer) < batch_size:
        return
    experiences = replay_buffer.sample(batch_size)
    states, actions, rewards, next_states, dones = zip(*experiences)
    
    states = torch.FloatTensor(states)
    actions = torch.LongTensor(actions)
    rewards = torch.FloatTensor(rewards)
    next_states = torch.FloatTensor(next_states)
    dones = torch.FloatTensor(dones)

    # Compute Q values
    q_values = model(states).gather(1, actions.unsqueeze(1)).squeeze(1)
    with torch.no_grad():
        target_q_values = rewards + (1 - dones) * gamma * model(next_states).max(1)[0]

    # Loss and optimization
    loss = nn.MSELoss()(q_values, target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

def main():
    env = gym.make("Pong-v0",render_mode='human')
    input_dim = 84 * 84  # 84x84 input size
    output_dim = env.action_space.n
    model = DQN(input_dim, output_dim)
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    replay_buffer = ReplayBuffer(10000)
    num_episodes = 1000
    epsilon_start = 1.0
    epsilon_min = 0.1
    epsilon_decay = 0.995
    gamma = 0.99
    batch_size = 32

    for episode in range(num_episodes):
        state, _ = env.reset()  # Unpack the reset result
        state = preprocess_frame(state)
        state = state.flatten()
        total_reward = 0
        done = False
        epsilon = max(epsilon_min, epsilon_start * (epsilon_decay ** episode))

        while not done:
            action = choose_action(state, epsilon, model)
            next_state, reward, done, life_lost, info = env.step(action)  # Unpack all five values
            next_state = preprocess_frame(next_state).flatten()
            total_reward += reward
            replay_buffer.add((state, action, reward, next_state, float(done)))
            state = next_state

            train_model(model, replay_buffer, optimizer, batch_size, gamma)

        print(f"Episode {episode}, Total Reward: {total_reward}, Epsilon: {epsilon:.3f}")

    env.close()



if __name__ == "__main__":
    main()
