<a href="https://colab.research.google.com/github/alerotta/DRL/blob/main/03%20-%20Deep%20Q%20Learing/Deep_Q_Learning_Space_Invaders.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install gymnasium[atari,accept-rom-license]
!pip install torch torchvision
!pip install opencv-python


In [8]:
import random
from collections import deque , namedtuple
import numpy as np
import gymnasium as gym
import cv2


import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [16]:
Experience = namedtuple('Experience',('state','action','reward','next_state','done'))

class ReplayBuffer ():
    def __init__(self,capacity,device):
        self.buffer = deque(maxlen=capacity)
        self.device = device

    def append (self,state,action,reward,next_state,done):
        self.buffer.append(Experience(state=state,action=action,reward=reward,next_state=next_state,done=done))

    def __len__(self):
        return len(self.buffer)

    def sample(self, batch_size):
        experiences = random.sample(self.buffer,batch_size)
        batch = Experience(*zip(*experiences))

        states = torch.tensor(np.stack(batch.state), dtype=torch.float32, device=self.device)
        actions = torch.tensor(batch.action, dtype=torch.long, device=self.device).unsqueeze(1)
        rewards = torch.tensor(batch.reward, dtype=torch.float32, device=self.device).unsqueeze(1)
        next_states = torch.tensor(np.stack(batch.next_state), dtype=torch.float32, device=self.device)
        dones = torch.tensor(batch.done, dtype=torch.float32, device=self.device).unsqueeze(1)

        return states, actions, rewards, next_states, dones

In [17]:
class MyDQN(nn.Module):
    def __init__(self, input_size, n_actions):
        super().__init__()
        self.conv1 = nn.Conv2d( input_size , 32, kernel_size=8 , stride=4)
        self.conv2 = nn.Conv2d( 32 , 64 , kernel_size=4 , stride=2)
        self.conv3 = nn.Conv2d( 64 , 64 , kernel_size=3 , stride=1)

        self.fc1 = nn.Linear(64 * 7 * 7 , 512)
        self.fc2 = nn.Linear(512,n_actions)

    def forward (self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))

        # reshaping in order to enter the linear layer, flatten
        # x.size(0) is the batch size
        # -1 means to infer automatically the size = 64*7*7
        x = x.view(x.size(0),-1)


        x = F.relu(self.fc1(x))
        return self.fc2(x)

In [18]:

class AtariPreprocessing(gym.Wrapper):
    def __init__(self, env, frame_skip=4, frame_size=84, frame_stack=4):
        super().__init__(env)
        self.frame_skip = frame_skip
        self.frame_size = frame_size
        self.frame_stack = frame_stack

        self.frames = deque(maxlen=frame_stack)
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(frame_stack, frame_size, frame_size), dtype=np.uint8)


    #this function convert to grey and resize
    def preprocess(self, obs):
        obs = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
        obs = cv2.resize(obs, (self.frame_size, self.frame_size), interpolation=cv2.INTER_AREA)
        return obs

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        obs = self.preprocess(obs)
        for _ in range(self.frame_stack):
            self.frames.append(obs)
        stacked_obs = np.stack(self.frames, axis=0)
        return stacked_obs, info

    def step(self, action):
        total_reward = 0.0
        terminated = False
        truncated = False

        for _ in range(self.frame_skip):
            obs, reward, term, trunc, info = self.env.step(action)
            total_reward += reward
            terminated = terminated or term
            truncated = truncated or trunc
            if terminated or truncated:
                break

        obs = self.preprocess(obs)
        self.frames.append(obs)

        stacked_obs = np.stack(self.frames, axis=0)
        return stacked_obs, total_reward, terminated, truncated, info



In [19]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import random
import numpy as np


def select_action(state, policy_net, epsilon, action_space, device):
    if random.random() < epsilon:
        return action_space.sample()
    else:
        state = np.array(state, copy=False)
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
        state = state / 255.0

        with torch.no_grad():
            q_values = policy_net(state)
            action = torch.argmax(q_values, dim=1).item()
        return action

In [None]:

env = gym.make("ALE/SpaceInvaders-v5", render_mode="rgb_array")
env = AtariPreprocessing(env)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

n_actions = env.action_space.n

policy_net = MyDQN(input_size=4, n_actions=n_actions).to(device)
target_net = MyDQN(input_size=4, n_actions=n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.Adam(policy_net.parameters(), lr=1e-4)
replay_buffer = ReplayBuffer(capacity=100_000, device=device)

batch_size = 32
gamma = 0.99
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.995
target_update_freq = 10


num_episodes = 1000

for episode in range(num_episodes):
    state, _ = env.reset()
    total_reward = 0

    for t in range(10_000):  # max steps per episode
        action = select_action(state, policy_net, epsilon, env.action_space, device)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        replay_buffer.append(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

        if len(replay_buffer) >= batch_size:
            # Train
            states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)

            # Normalize pixel values (already on correct device)
            states = states / 255.0
            next_states = next_states / 255.0

            q_values = policy_net(states).gather(1, actions)

            with torch.no_grad():
                max_next_q = target_net(next_states).max(1, keepdim=True)[0]
                target_q = rewards + gamma * max_next_q * (1 - dones)

            loss = nn.MSELoss()(q_values, target_q)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        if done:
            break

    epsilon = max(epsilon_min, epsilon * epsilon_decay)

    if episode % target_update_freq == 0:
        target_net.load_state_dict(policy_net.state_dict())

    print(f"Episode {episode}, Reward: {total_reward}, Epsilon: {epsilon:.3f}")
