<a href="https://colab.research.google.com/github/Ad-Chekk/ML_models/blob/main/Obstacle_game_RL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pygame torch torchvision numpy



In [None]:
def preprocess(frame):
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)  # Convert to grayscale
    frame = cv2.resize(frame, (84, 84)) / 255.0      # Resize & normalize

    # ✅ Apply a color effect (Invert colors)
    frame = 1.0 - frame  # This makes the game appear like "negative film"

    return frame


In [None]:
!pip install gymnasium[atari] ale-py AutoROM opencv-python torch torchvision
!AutoROM --accept-license --install-dir /root/.roms
import os
os.environ["ALE_ROM_PATH"] = "/root/.roms"


Usage: AutoROM [OPTIONS]
Try 'AutoROM --help' for help.

Error: Invalid value for '-d' / '--install-dir': Path '/root/.roms' does not exist.


In [None]:
import pygame
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque

# Game Constants
SCREEN_WIDTH, SCREEN_HEIGHT = 400, 600
PLAYER_SIZE = 40
BLOCK_SIZE = 40
BLOCK_SPEED = 5
FPS = 30

# Initialize Pygame
pygame.init()
screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
clock = pygame.time.Clock()

class DodgeGame:
    def __init__(self):
        self.reset()

    def reset(self):
        self.player_x = SCREEN_WIDTH // 2 - PLAYER_SIZE // 2
        self.player_y = SCREEN_HEIGHT - 80
        self.blocks = []
        self.score = 0
        return self.get_state()

    def step(self, action):
        # Move Left or Right
        if action == 0:
            self.player_x -= 20
        elif action == 1:
            self.player_x += 20

        # Keep player within screen
        self.player_x = max(0, min(self.player_x, SCREEN_WIDTH - PLAYER_SIZE))

        # Move blocks down
        for block in self.blocks:
            block[1] += BLOCK_SPEED

        # Remove blocks that leave screen
        self.blocks = [b for b in self.blocks if b[1] < SCREEN_HEIGHT]

        # Spawn new blocks randomly
        if random.random() < 0.05:
            self.blocks.append([random.randint(0, SCREEN_WIDTH - BLOCK_SIZE), 0])

        # Check for collision
        for bx, by in self.blocks:
            if (bx < self.player_x < bx + BLOCK_SIZE or bx < self.player_x + PLAYER_SIZE < bx + BLOCK_SIZE) and (by + BLOCK_SIZE > self.player_y):
                return self.get_state(), -10, True  # Game Over

        self.score += 1
        return self.get_state(), 1, False  # Reward for surviving

    def get_state(self):
        state = np.zeros((SCREEN_WIDTH, SCREEN_HEIGHT))
        state[self.player_x:self.player_x + PLAYER_SIZE, self.player_y:self.player_y + PLAYER_SIZE] = 1
        for bx, by in self.blocks:
            state[bx:bx + BLOCK_SIZE, by:by + BLOCK_SIZE] = -1
        return torch.tensor(state, dtype=torch.float32).flatten()

    def render(self):
        screen.fill((30, 30, 30))  # Background
        pygame.draw.rect(screen, (0, 255, 0), (self.player_x, self.player_y, PLAYER_SIZE, PLAYER_SIZE))  # Player
        for bx, by in self.blocks:
            pygame.draw.rect(screen, (255, 0, 0), (bx, by, BLOCK_SIZE, BLOCK_SIZE))  # Falling blocks
        pygame.display.flip()
        clock.tick(FPS)


In [None]:
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, output_dim)
        )

    def forward(self, x):
        return self.fc(x)

# Initialize Game & DQN
env = DodgeGame()
state_dim = SCREEN_WIDTH * SCREEN_HEIGHT
action_dim = 2  # Left or Right

policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=0.001)
loss_fn = nn.MSELoss()
memory = deque(maxlen=5000)
gamma = 0.99
epsilon = 1.0
epsilon_min = 0.05
epsilon_decay = 0.995
batch_size = 32
update_target_freq = 10

def select_action(state):
    if random.random() < epsilon:
        return random.choice([0, 1])  # Random action
    else:
        with torch.no_grad():
            return policy_net(state).argmax().item()

# Training loop
num_episodes = 500
for episode in range(num_episodes):
    state = env.reset()
    total_reward = 0
    done = False

    while not done:
        action = select_action(state)
        next_state, reward, done = env.step(action)

        memory.append((state, action, reward, next_state, done))
        state = next_state
        total_reward += reward

        if done:
            break

    if len(memory) > batch_size:
        minibatch = random.sample(memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*minibatch)

        states = torch.stack(states)
        actions = torch.tensor(actions, dtype=torch.int64)
        rewards = torch.tensor(rewards, dtype=torch.float32)
        next_states = torch.stack(next_states)
        dones = torch.tensor(dones, dtype=torch.float32)

        q_values = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze()
        next_q_values = target_net(next_states).max(1)[0].detach()
        expected_q_values = rewards + gamma * next_q_values * (1 - dones)

        loss = loss_fn(q_values, expected_q_values)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if episode % update_target_freq == 0:
        target_net.load_state_dict(policy_net.state_dict())

    epsilon = max(epsilon_min, epsilon * epsilon_decay)
    print(f"Episode {episode + 1}/{num_episodes}, Score: {total_reward}, Epsilon: {epsilon:.2f}")

print("Training Complete!")


Episode 1/500, Score: 93, Epsilon: 0.99
Episode 2/500, Score: 90, Epsilon: 0.99
Episode 3/500, Score: 131, Epsilon: 0.99
Episode 4/500, Score: 104, Epsilon: 0.98
Episode 5/500, Score: 93, Epsilon: 0.98
Episode 6/500, Score: 138, Epsilon: 0.97
Episode 7/500, Score: 155, Epsilon: 0.97
Episode 8/500, Score: 104, Epsilon: 0.96
Episode 9/500, Score: 132, Epsilon: 0.96
Episode 10/500, Score: 92, Epsilon: 0.95
Episode 11/500, Score: 140, Epsilon: 0.95
Episode 12/500, Score: 139, Epsilon: 0.94
Episode 13/500, Score: 103, Epsilon: 0.94
Episode 14/500, Score: 143, Epsilon: 0.93
Episode 15/500, Score: 106, Epsilon: 0.93
Episode 16/500, Score: 114, Epsilon: 0.92
Episode 17/500, Score: 88, Epsilon: 0.92
Episode 18/500, Score: 107, Epsilon: 0.91
Episode 19/500, Score: 114, Epsilon: 0.91
Episode 20/500, Score: 110, Epsilon: 0.90
Episode 21/500, Score: 92, Epsilon: 0.90
Episode 22/500, Score: 102, Epsilon: 0.90
Episode 23/500, Score: 128, Epsilon: 0.89
Episode 24/500, Score: 126, Epsilon: 0.89
Episode

In [None]:
env = DodgeGame()
state = env.reset()
done = False

while not done:
    env.render()  # Show the game
    with torch.no_grad():
        action = policy_net(state).argmax().item()
    state, _, done = env.step(action)


In [None]:
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import time

def play_trained_agent(policy_net, env, num_episodes=3):
    for episode in range(num_episodes):
        state = env.reset()
        done = False
        total_reward = 0

        while not done:
            with torch.no_grad():
                action = policy_net(state).argmax().item()

            state, reward, done = env.step(action)
            total_reward += reward

            # ✅ Render the game as an image
            env.render()
            frame = pygame.surfarray.array3d(screen)  # Get game screen as an array
            frame = frame.swapaxes(0, 1)  # Adjust format for display

            plt.imshow(frame)
            plt.axis("off")
            clear_output(wait=True)
            display(plt.gcf())
            time.sleep(0.05)  # Adjust speed

        print(f"Episode {episode + 1} finished with total reward: {total_reward}")

# ✅ Run this to see the trained agent play the game
play_trained_agent(policy_net, env)


KeyboardInterrupt: 

Error in callback <function flush_figures at 0x79dd16babf60> (for post_execute):


KeyboardInterrupt: 

In [None]:
torch.save(policy_net.state_dict(), "dodge_blocks_dqn.pth")


In [None]:
import torch

# Load the trained model
policy_net.load_state_dict(torch.load("dodge_blocks_dqn.pth"))
policy_net.eval()  # Set model to evaluation mode

NameError: name 'policy_net' is not defined

In [None]:
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import time

def play_trained_agent(policy_net, env, num_episodes=3):
    for episode in range(num_episodes):
        state = env.reset()
        done = False
        total_reward = 0

        while not done:
            with torch.no_grad():
                action = policy_net(state).argmax().item()

            state, reward, done = env.step(action)
            total_reward += reward

            # ✅ Render the game as an image
            env.render()
            frame = pygame.surfarray.array3d(screen)  # Get game screen as an array
            frame = frame.swapaxes(0, 1)  # Adjust format for display

            plt.imshow(frame)
            plt.axis("off")
            clear_output(wait=True)
            display(plt.gcf())
            time.sleep(0.05)  # Adjust speed

        print(f"Episode {episode + 1} finished with total reward: {total_reward}")

# ✅ Run the trained agent
play_trained_agent(policy_net, env)

NameError: name 'policy_net' is not defined