In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import random

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = torch.device("cpu")


In [2]:
from typing import List, Tuple


class RecurrentIQN(nn.Module):
    def __init__(self, input_size: int, output_size: int, hidden_size: int, n_quantiles=32) -> None:
        """ Initialize the Recurrent IQN model

        Args:
            input_size (int): The size of the input matrix
            output_size (int): The size of the actions
            hidden_size (int): The size of the hidden layer
            n_quantiles (int, optional): The number of quantiles. Defaults to 32.
        """
        
        super(RecurrentIQN, self).__init__()
        self.n_quantiles = n_quantiles
        self.input_size = input_size
        self.hidden_size = hidden_size

        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.quantile_embed = nn.Linear(hidden_size, hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x: torch.FloatTensor, quantiles: torch.Tensor, hidden: Tuple[torch.Tensor, torch.Tensor]
) -> Tuple[torch.FloatTensor | Tuple[torch.Tensor]]:
        """ Forward pass of the Recurrent IQN model

        Args:
            x (torch.FloatTensor): The input tensor
            quantiles (torch.Tensor): The quantiles
            hidden (Tuple[torch.Tensor, torch.Tensor]): The hidden state

        Returns:
            torch.FloatTensor: The output tensor
            Tuple: The hidden state of the model
        """
        
        lstm_out, hidden = self.lstm(x, hidden)
        
        quantiles = quantiles.unsqueeze(-1)  
        pi = torch.acos(torch.zeros(1)).item() * 2
        quantile_feats = torch.cos(pi * quantiles * torch.arange(1, self.hidden_size + 1).to(x.device))
        quantile_feats = F.relu(self.quantile_embed(quantile_feats)) 

        lstm_out = lstm_out[:, -1, :].unsqueeze(1) 
        x = lstm_out * quantile_feats 

        x = self.fc(x) 
        return x, hidden

    def act(self, state: List, hidden: Tuple[torch.Tensor, torch.Tensor], epsilon: float) -> Tuple[int, Tuple[torch.Tensor]]:
        """ Acting function of the Recurrent IQN model

        Args:
            state (List): The state of the environment
            hidden (Tuple[torch.Tensor, torch.Tensor]): The hidden state of the model
            epsilon (float): The epsilon value

        Returns:
            int: The action to take
            Tuple: The hidden state of the model
        """
        
        if random.random() > epsilon:
            print("Model acting")
            with torch.no_grad():
                state = torch.FloatTensor(state).unsqueeze(0).unsqueeze(0).to(next(self.parameters()).device)
                quantiles = torch.rand(1, self.n_quantiles).to(state.device)
                q_values, hidden = self.forward(state, quantiles, hidden)
                q_values = q_values.mean(dim=1)
                action = q_values.argmax(dim=1).item()
        else:
            action = random.randrange(self.fc.out_features)
        return action, hidden


In [3]:
from typing import Any
from numpy import dtype, ndarray


class ReplayBuffer:
    def __init__(self, capacity: int) -> None:
        """ Initialize the ReplayBuffer

        Args:
            capacity (int): The capacity of the buffer
        """
        
        self.capacity = capacity
        self.buffer = []
        self.position = 0
        
    def push(self, state: list, action: int, reward: float, next_state: list, done: int) -> None:
        """ Push a new experience to the buffer

        Args:
            state (list): State of the environment
            action (int): The action taken
            reward (float): The reward received
            next_state (list): The next state of the environment
            done (int): The done flag
        """
        
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity
        
    def sample(self, batch_size: int) -> Tuple[ndarray[Any, dtype[Any]], ndarray[Any, dtype[Any]], ndarray[Any, dtype[Any]], ndarray[Any, dtype[Any]], ndarray[Any, dtype[Any]]]:
        """ Sample a batch from the buffer

        Args:
            batch_size (int): The size of the batch

        Returns:
            Tuple[ndarray[Any, dtype[Any]], ndarray[Any, dtype[Any]], ndarray[Any, dtype[Any]], ndarray[Any, dtype[Any]], ndarray[Any, dtype[Any]]: The batch of experiences
        """
        
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = map(np.stack, zip(*batch))
        return state, action, reward, next_state, done
    
    def pop(self, count=1) -> None:
        """ Pop the first element from the buffer
        
        Args:
            count (int, optional): The number of elements to pop. Defaults to 1.
        """
        
        for _ in range(count):
            self.buffer.pop(0)
            self.position -= 1
            if self.position < 0:
                self.position = 0
     
    def __len__(self):
        """ Get the length of the buffer
        
        Returns:
            int: The length of the buffer
        """
        
        return len(self.buffer)
    

class IQNAgent:
    def __init__(self, input_size, output_size, hidden_size, replay_buffer_capacity, batch_size, gamma, epsilon_start, epsilon_end, epsilon_decay, n_quantiles=32) -> None:
        """ Initialize the IQN Agent. The IQN agent is a recurrent IQN model with a replay buffer.

        Args:
            input_size (int): The size of the input matrix
            output_size (int): The size of the actions
            hidden_size (int): The size of the hidden layer
            replay_buffer_capacity (int): The capacity of the replay buffer
            batch_size (int): The size of the batch for training
            gamma (float): The discount factor
            epsilon_start (float): The starting epsilon value
            epsilon_end (float): The ending epsilon value
            epsilon_decay (float): The decay rate of epsilon
            n_quantiles (int, optional): The number of quantiles. Defaults to 32.
        """
        
        self.input_size = input_size
        self.output_size = output_size
        self.hidden_size = hidden_size
        self.replay_buffer_capacity = replay_buffer_capacity
        self.batch_size = batch_size
        self.gamma = gamma
        self.epsilon_start = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.n_quantiles = n_quantiles

        self.model = RecurrentIQN(input_size, output_size, hidden_size, n_quantiles).to(device)
        self.model_target = RecurrentIQN(input_size, output_size, hidden_size, n_quantiles).to(device)
        self.model_target.load_state_dict(self.model.state_dict())

        self.hidden = (torch.zeros(1, 1, hidden_size).to(device),
                       torch.zeros(1, 1, hidden_size).to(device))

        self.replay_buffer = ReplayBuffer(replay_buffer_capacity)
        self.optimizer = optim.Adam(self.model.parameters())

        self.steps_done = 0

        self.update_counter = 0
        self.target_update_freq = 1000

    def select_action(self, state: list) -> int:
        """ Select an action based on the state

        Args:
            state (list): The state of the environment

        Returns:
            int: The action to take
        """
        
        epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * np.exp(-1. * self.steps_done / self.epsilon_decay)
        self.steps_done += 1
        action, self.hidden = self.model.act(state, self.hidden, epsilon)
        return action

    def optimize_model(self) -> None | float:
        """ Optimize the model

        Returns:
            float: The loss of the model
        """
        
        if len(self.replay_buffer) < self.batch_size:
            return
        
        self.model.train()
        print("Optimizing model...")
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)

        states = torch.FloatTensor(states).unsqueeze(1).to(device)
        actions = torch.LongTensor(actions).to(device)
        rewards = torch.FloatTensor(rewards).to(device)
        next_states = torch.FloatTensor(next_states).unsqueeze(1).to(device)
        dones = torch.FloatTensor(dones).to(device)
        
        quantiles = torch.rand(self.batch_size, self.n_quantiles).to(device)

        hidden = (torch.zeros(1, self.batch_size, self.hidden_size).to(device),
                  torch.zeros(1, self.batch_size, self.hidden_size).to(device))

        current_q, _ = self.model(states, quantiles, hidden)
        current_q = current_q.gather(2, actions.unsqueeze(-1).unsqueeze(-1).expand(-1, self.n_quantiles, -1)).squeeze(-1)

        with torch.no_grad():
            next_hidden = (torch.zeros(1, self.batch_size, self.hidden_size).to(device),
                           torch.zeros(1, self.batch_size, self.hidden_size).to(device))
            next_quantiles = torch.rand(self.batch_size, self.n_quantiles).to(device)
            next_q, _ = self.model_target(next_states, next_quantiles, next_hidden)
            next_q = next_q.max(2)[0]
            target_q = rewards.unsqueeze(1) + self.gamma * next_q * (1 - dones.unsqueeze(1))

        td_errors = target_q.unsqueeze(1) - current_q
        huber_loss = F.smooth_l1_loss(current_q, target_q.unsqueeze(1), reduction='none')
        quantile_loss = (torch.abs(quantiles.unsqueeze(-1) - (td_errors.detach() < 0).float()) * huber_loss).mean()

        self.optimizer.zero_grad()
        quantile_loss.backward()
        self.optimizer.step()

        self.update_counter += 1
        if self.update_counter % self.target_update_freq == 0:
            self.update_target_network()

        return quantile_loss.item()

    def update_target_network(self) -> None:
        """ Update the target network """
        
        self.model_target.load_state_dict(self.model.state_dict())

    def push(self, state: list, action: int, reward: float, next_state: list, done: int) -> None:
        """ Push a new experience to the replay buffer

        Args:
            state (list): The state of the environment
            action (int): The action taken
            reward (float): The reward received
            next_state (list): The next state of the environment
            done (int): The done flag
        """
        
        self.replay_buffer.push(state, action, reward, next_state, done)

    def save(self, path: str) -> None:
        """ Save the model to a file

        Args:
            path (str): The path to save the model
        """
        torch.save(self.model.state_dict(), path)

    def load(self, path: str) -> None:
        """ Load the model from a file

        Args:
            path (str): The path to load the model from
        """ 
        
        self.model.load_state_dict(torch.load(path))
        self.model.eval()

    def reset(self) -> None:
        """ Reset the agent """
        self.steps_done = 0
    
    def pop(self, count=1):
        """ Pop the first element from the replay buffer
        
        Args:
            count (int, optional): The number of elements to pop. Defaults to 1.
        """
        
        self.replay_buffer.pop(count)


In [4]:
def simplify_state(state: np.ndarray) -> Tuple[Tuple[int, int], Tuple, Tuple, Tuple, Tuple]:
    """ Simplify the state of the environment

    Args:
        state (np.ndarray): The state of the environment

    Returns:
        Tuple[Tuple[int, int], Tuple[Tuple[int, int]], Tuple[Tuple[int, int]]]: The simplified state
    """
    
    agent_x, agent_y = get_agent_position(state)
    obstacles = get_obstacles(state)
    timbers = get_timbers(state)
    cars = get_cars(state)
    water = get_water(state)
    
    simplified_state = (
        (agent_x, agent_y),
        tuple(set(obstacles)),
        tuple(set(timbers)),
        tuple(set(cars)),
        tuple(set(water))
        )
    
    return simplified_state

def get_agent_position(state: np.ndarray) -> Tuple[int, int]:
    """ Get the position of the agent in the state

    Args:
        state (np.ndarray): The state of the environment

    Returns:
        tuple[int, int]: The position of the agent
    """
    
    for i, row in enumerate(state):
        for j, cell in enumerate(row):
            if cell == 2:
                return j, i
            
    return state.shape[1] // 2, state.shape[0] - 1
    
def get_cars(state: np.ndarray) -> list:
    """ Get the positions of the cars in the state

    Args:
        state (np.ndarray): The state of the environment

    Returns:
        list: The positions of the cars
    """
    
    cars = []
    for i, row in enumerate(state):
        for j, cell in enumerate(row):
            if cell == 1:
                cars.append((j, i))
    return cars
            
def get_obstacles(state: np.ndarray) -> list:
    """ Get the positions of the obstacles in the state

    Args:
        state (np.ndarray): The state of the environment

    Returns:
        list: The positions of the obstacles
    """
    
    obstacles = []
    for i, row in enumerate(state):
        for j, cell in enumerate(row):
            if cell == 2:
                obstacles.append((j, i))
    return obstacles

def get_timbers(state: np.ndarray) -> list:
    """ Get the positions of the timbers in the state

    Args:
        state (np.ndarray): The state of the environment

    Returns:
        list: The positions of the timbers
    """
    
    timbers = []
    for i, row in enumerate(state):
        for j, cell in enumerate(row):
            if cell == 4:
                timbers.append((j, i))
    return timbers

def get_water(state: np.ndarray) -> list:
    """ Get the positions of the water in the state

    Args:
        state (np.ndarray): The state of the environment

    Returns:
        list: The positions of the water
    """
    
    water = []
    for i, row in enumerate(state):
        for j, cell in enumerate(row):
            if cell == 5:
                water.append((j, i))
    return water



In [None]:
from typing import Literal
from numpy._typing._array_like import NDArray
import pyautogui
import cv2
import time
import keyboard
import torchvision
from ultralytics import YOLO


RES_X = 1920
RES_Y = 1080

GAME_REGION = (405, 210, 850, 480)
restart_button = cv2.imread('restart_button.png', cv2.IMREAD_GRAYSCALE)


def get_screen(region: Tuple[int, int , int, int]) -> Tuple:
    """ Get the screenshot of the game

    Args:
        region (Tuple): The region of the screen to capture

    Returns:
        Tuple: The screenshot of the game
    """
    
    screen = pyautogui.screenshot(region=(region[0], region[1], region[2], region[3]))
    
    non_crop = screen.copy()

    transforms = torchvision.transforms.Compose([
        torchvision.transforms.RandomRotation((14, 14)),
        torchvision.transforms.CenterCrop((320, 566)),
        torchvision.transforms.Resize((240, 425)),
    ])
    
    screen = transforms(screen)   
    
    screen = cv2.cvtColor(np.array(screen), cv2.COLOR_RGB2BGR)
    
    non_crop = cv2.cvtColor(np.array(non_crop), cv2.COLOR_RGB2BGR)
    non_crop = cv2.resize(non_crop, (425, 240))
    
    return screen, non_crop

import numpy as np

def map_to_grid(image_size, grid_size, boxes, class_labels) -> np.ndarray[Any]:
    """
    Map detected bounding boxes to a grid representation.

    Args:
        image_size: Tuple (width, height) of the image.
        grid_size: Tuple (N, M) of the grid dimensions.
        boxes: List of bounding boxes [(x_min, y_min, x_max, y_max)].
        class_labels: List of class labels corresponding to the boxes.

    Returns:
        grid: 2D numpy array of shape (N, M) with object class labels.
    """
    
    width, height = image_size
    grid_width, grid_height = grid_size
    grid = np.zeros((grid_height, grid_width), dtype=int)

    cell_width = width / grid_width
    cell_height = height / grid_height

    for (x_min, y_min, x_max, y_max), label in zip(boxes, class_labels):
        x_start = int(x_min // cell_width)
        y_start = int(y_min // cell_height)
        x_end = int(np.ceil(x_max / cell_width))
        y_end = int(np.ceil(y_max / cell_height))

        for y in range(y_start, y_end):
            for x in range(x_start, x_end):
                grid[y, x] = label + 1

    return grid


def get_state(screen) -> ndarray[Any, dtype[Any]]:
    """ Get the state of the environment

    Args:
        screen (MatLike): The screenshot of the game

    Returns:
        ndarray[Any, dtype[Any]]: The grid state of the environment
    """
    
    results = cv_model(screen, verbose=False)

    image_size = (425, 240)
    grid_size = (36, 32)

    boxes = []
    labels = []
    
    boxes_ = results[0].boxes
    for box in boxes_:
        x_min, y_min, x_max, y_max = box.xyxy[0].tolist()
        
        class_id = int(box.cls[0].item())
        
        boxes.append((x_min, y_min, x_max, y_max))
        labels.append(class_id)

    try:
        boxes, labels = zip(*sorted(zip(boxes, labels), key=lambda x: -x[1]))    
    
    except ValueError:
        boxes = []
        labels = []
    
    grid = map_to_grid(image_size, grid_size, boxes, labels)

    return grid

def is_game_over(image, score_threshold=0.5, scale=0.5):
    """ Check if the game is over
    
    Args:
        image (MatLike): The screenshot of the game
        score_threshold (float, optional): The score threshold. Defaults to 0.5.
        scale (float, optional): The scale of the image. Defaults to 0.5.
        
    Returns:
        bool: True if the game is over, False otherwise
    """
    
    grey_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    resized_template = cv2.resize(restart_button, (0, 0), fx=scale, fy=scale)
    h, w = grey_image.shape

    cropped_search_box = grey_image[int(h * 0.87):, int(w * 0.43):int(w * 0.57)]
    
    result = cv2.matchTemplate(cropped_search_box, resized_template, cv2.TM_CCOEFF_NORMED)
    result = np.sort(result.flatten())[::-1]
    
    return result.max() > score_threshold

def process_state(state, max_obstacles=1152, max_timbers=1152, max_cars=1152, max_water=1152) -> list:
    """ Process the state of the environment

    Args:
        state (Tuple): The state of the environment
        max_obstacles (int, optional): The maximum number of obstacles. Defaults to 1152.
        max_timbers (int, optional): The maximum number of timbers. Defaults to 1152.
        max_cars (int, optional): The maximum number of cars. Defaults to 1152.
        max_water (int, optional): The maximum number of water. Defaults to 1152.

    Returns:
        list: The processed state of the environment
    """
    
    agent_pos, obstacles, timbers, cars, water = state[0], state[1], state[2], state[3], state[4]
    
    state_vector = list(agent_pos)
    
    items = [(list(obstacles), max_obstacles), (list(timbers), max_timbers), (list(cars), max_cars), (list(water), max_water)]
    
    for item, max_count in items:
        for i in range(max_count):
            if i < len(item):
                state_vector.extend(item[i])
            else:
                state_vector.extend([0, 0])
    
    return state_vector

def compute_reward(reward_state) -> float | Literal[-100]:
    """ Compute the reward of the environment

    Returns:
        float: The reward of the environment
    """
    
    time = reward_state['time']
    state = reward_state['state']
    action = reward_state['action']
    prev_action = reward_state['prev_action']
    next_state = reward_state['next_state']
    total_reward = reward_state['total_reward']
    non_crop_state = reward_state['non_crop_state']
    reward = 0
    
    if is_game_over(non_crop_state):
        reward = -100
        return reward
    
    if action == 0:
        reward += 2
    elif action == 2:
        reward += 0.25
    elif action == 3:
        reward += 0.25
    elif action == 4:
        reward += 0.2
    
    hen_count = 0
    for row in next_state:
        for cell in row:
            if cell == 1:
                hen_count += 1

    if action == prev_action:
        if state[0][0] // 5 == next_state[0][0] // 5 \
        and state[0][1] // 5 == next_state[0][1] // 5:
            reward -= 10
            
    reward = round(reward, 2)
    
    return reward
    
    
actions = [0, 1, 2, 3, 4]

input_size = 9218
agent = IQNAgent(input_size, 5, 128, 10000, 32, 0.99, 1.0, 0.1, 1000)
cv_model = YOLO('best_cv.pt')
episodes = 1000
episode_length = 1000
losses = []
rewards = []

print("Model is ready to train")

keyboard.wait('q')

for episode in range(episodes):
    screenshot, non_crop_state = get_screen(GAME_REGION)
    state_raw = get_state(screenshot)
    
    total_reward = 0
    total_loss = 0
    action = 0

    state = simplify_state(state_raw)
    total_reward = 0
    done = False
    
    
    start_time = time.time()
    for step in range(episode_length):
        prev_action = action
        time.sleep(0.025)
        
        state_vector = process_state(state)
        
        action = agent.select_action(state_vector)
        
        if action < 4:
            pyautogui.press(['up', 'down', 'left', 'right'][action])
        
        
        next_screenshot, non_crop_state = get_screen(GAME_REGION)
        next_state_raw = get_state(next_screenshot)
        
        next_state = simplify_state(next_state_raw)
        next_state_vector = process_state(next_state)
        
        reward_state = {
            'state': state,
            'action': action,
            'prev_action': prev_action,
            'next_state': next_state,
            'time': time.time() - start_time,
            'total_reward': total_reward,
            'non_crop_state': non_crop_state
        }
        
        done = 0
        reward = compute_reward(reward_state)
        if is_game_over(non_crop_state):
            done = 1
            agent.push(state_vector, action, reward, next_state_vector, done)
            break
        

        agent.push(state_vector, action, reward, next_state_vector, done)
        state = next_state
        total_reward += reward
        
        loss = agent.optimize_model()

        print(f"Step: {step}, Action: {action}, Reward: {reward}, Total Reward: {total_reward}, Loss: {loss}")
 
    keyboard.press_and_release('space')
    losses.append(total_loss)
    rewards.append(total_reward)
    print('\nepisode: {}, reward: {}\n'.format(episode, total_reward))
    
    agent.pop(4)    

    if episode % 10 == 0:
        agent.save('dqn.pth')
        print("Model saved")
    
    time.sleep(3.25)
    keyboard.press_and_release('space')

