In [29]:
from mss import mss # used for screen capture and faster than cv in this work
import pydirectinput # sends command to chrome/ alternative of selenium
import cv2 # frame processing
import numpy as np # transformation of framework
import pytesseract # extracts text from image
from matplotlib import pyplot as plt # vissualize captured frames
import time # for pauses
from gymnasium import Env # environment components
from gymnasium.spaces import Box, Discrete
import torch.nn as nn
import os
from datetime import datetime, timedelta
import matplotlib
import torch
import itertools
import torch.nn.functional as F

# custom game environment

In [30]:
class MrDino(Env):
    def __init__(self, render_mode):
        super().__init__()
        # setup spaces
        self.render_mode = render_mode
        self.observation_space=Box(low=0,high=255, shape=(1,83,100), dtype=np.uint8)
        self.action_space = Discrete(3) #number of action

        self.cap =mss()

        self.obstacle_passed_region = {'top': 245, 'left': 570, 'width': 38, 'height': 135}
        self.obstacle_coming = {'top':205, 'left':647, 'width':273, 'height':175} #(30,270) (h,w)
        self.game_location = {'top':205, 'left':580, 'width':330, 'height':175} #(30,270) (h,w)
        self.done_location = {'top':240, 'left':820, 'width':280, 'height':50}

    def step(self, action):
        # action key: 0= space, 1=down, 2=no action
        action_map ={
            0: 'space',
            1: 'down',
            2: 'no_op'
        }

        if action !=2:
            pydirectinput.press(action_map[action])

        res,  done, done_cap=self.get_done()
        new_observation =self.get_observation()
        
        
        if done:
            reward = -10  # penalty if game over
        else:
            reward= +1
        if self.obstacle_passed():
            reward += 0.1   # reward for surviving a frame

        # info dictionary
        info = {}
        truncated= False

        if self.render_mode == "human":
           self.render()
           
        return new_observation, reward, done, truncated, info

    def render(self): #visualize the game
        # cv2.imshow('Game', np.array(self.cap.grab(self.game_location))[:,:,:3])
        region_img = np.array(self.cap.grab(self.game_location))[:, :, :3]
    
        gray = cv2.cvtColor(region_img, cv2.COLOR_BGR2GRAY)
        
        threshold_value = 170  # ← tweak this
        _, thresh = cv2.threshold(gray, threshold_value, 255, cv2.THRESH_BINARY)

        cv2.imshow('Thresholded', thresh) 
        if cv2.waitKey(0) & 0xFF == ord('q'):
            self.close()

    def reset(self, *, seed=None, options=None): 
       super().reset(seed=seed)
       time.sleep(0.1 )
       pydirectinput.click(x=150,y=300)
       pydirectinput.press('space')
       obs = self.get_observation()
       info = {}
       if self.render_mode == "human":
           self.render()
       return obs, info

    def close(self):
        cv2.destroyAllWindows()

    def get_observation(self):
        raw = np.array(self.cap.grab(self.game_location))[:, :, :3].astype(np.uint8)
        gray = cv2.cvtColor(raw, cv2.COLOR_BGR2GRAY)

        _, thresh = cv2.threshold(gray, 170, 255, cv2.THRESH_BINARY)

        resized = cv2.resize(thresh, (100, 83))  # (width, height)
        # add channel dimension
        channel = np.reshape(resized, (1, 83, 100))
        obstacle_ahead= self.obstacle_ahead()
        return channel, obstacle_ahead

    def obstacle_ahead(self):
        region_img = np.array(self.cap.grab(self.obstacle_coming))[:, :, :3]
        gray = cv2.cvtColor(region_img, cv2.COLOR_BGR2GRAY)
        _, thresh = cv2.threshold(gray, 170, 255, cv2.THRESH_BINARY)
        resized = cv2.resize(thresh, (100, 83))
        obstacle_pixels = np.sum(resized == 255)

        return (obstacle_pixels > 25).astype(np.int8)

    
    def get_done(self):
        pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"
        # game over text extraction
        done_cap= np.array(self.cap.grab(self.done_location))[:,:,:3]
        done_strings= ['GAME', 'GAHE']
        
        done =False
        res= pytesseract.image_to_string(done_cap)[:4]
        if res in done_strings:
            done= True

        return res, done, done_cap
    
    def obstacle_passed(self):
        region_img = np.array(self.cap.grab(self.obstacle_passed_region))[:, :, :3]
        gray = cv2.cvtColor(region_img, cv2.COLOR_BGR2GRAY)
        _, thresh = cv2.threshold(gray, 170, 255, cv2.THRESH_BINARY)
        obstacle_pixels = np.sum(thresh == 255)

     
        return obstacle_pixels > 350  # adjust based on what you observe

In [31]:
env= MrDino(render_mode=None)

In [32]:
plt.imshow(cv2.cvtColor(env.get_observation()[0][0], cv2.COLOR_BGR2RGB))

<matplotlib.image.AxesImage at 0x236eb8060d0>

In [33]:
env.get_observation()[0].shape

(1, 83, 100)

# replay memory

In [34]:
from collections import deque
import random
class ReplayMemory():
    def __init__(self, maxlen, seed=None):
        self.memory = deque([], maxlen=maxlen)

        # Optional seed for reproducibility
        if seed is not None:
            random.seed(seed)

    def append(self, transition):
        self.memory.append(transition)

    def sample(self, sample_size):
        return random.sample(self.memory, sample_size)

    def __len__(self):
        return len(self.memory)

# model

In [35]:

class CNN_DQN(nn.Module):
    def __init__(self, image_height, image_width, action_dim=3,  # 3 actions for dino
                 hidden_dim=128, enable_dueling_dqn=True):  # Reduced from 256
        super(CNN_DQN, self).__init__()

        self.enable_dueling_dqn = enable_dueling_dqn

        # Lighter CNN for simple game graphics
        self.conv1 = nn.Conv2d(1, 16, kernel_size=8, stride=4, padding=2)  # Reduced filters
        self.conv2 = nn.Conv2d(16, 32, kernel_size=4, stride=2, padding=1)  # Reduced filters
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)  # Reduced filters

        # Calculate the size of flattened CNN output
        self.cnn_output_size = self._get_cnn_output_size(1, image_height, image_width)

        # Simpler FC layers - sufficient for dino game
        self.fc1 = nn.Linear(self.cnn_output_size + 1, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim // 2)  # Only 2 shared layers

        # Lighter dropout
        self.dropout = nn.Dropout(0.2)  # Reduced from 0.3

        if self.enable_dueling_dqn:
            # Simpler dueling streams
            self.fc_value = nn.Linear(hidden_dim // 2, 64)
            self.value = nn.Linear(64, 1)

            self.fc_advantages = nn.Linear(hidden_dim // 2, 64)
            self.advantages = nn.Linear(64, action_dim)
        else:
            # Standard DQN
            self.fc3 = nn.Linear(hidden_dim // 2, 64)
            self.output = nn.Linear(64, action_dim)

    def _get_cnn_output_size(self, channels, height, width):
        """Calculate the output size of CNN layers"""
        with torch.no_grad():
            dummy_input = torch.zeros(1, channels, height, width)
            x = F.relu(self.conv1(dummy_input))
            x = F.relu(self.conv2(x))
            x = F.relu(self.conv3(x))
            # print(f"cnn output size{x.flatten(1).size(1)}")
            return x.flatten(1).size(1)

    def forward(self, image_input, integer_input):
        # Process image through CNN
        x = F.relu(self.conv1(image_input))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))

        # Flatten CNN output
        x = x.view(x.size(0), -1)
        # print(x.shape)

        batch_size = x.size(0)

        # Convert to tensor if it's not already
        if not isinstance(integer_input, torch.Tensor):
            integer_input = torch.tensor(integer_input, dtype=torch.float32)

        # print(f"Integer input before processing: {integer_input.shape if hasattr(integer_input, 'shape') else integer_input}")

        if integer_input.dim() == 0:  # scalar tensor
            # Expand to match batch size
            integer_input = integer_input.unsqueeze(0).expand(batch_size, 1)
        elif integer_input.dim() == 1:  # 1D tensor
            if integer_input.size(0) == 1 and batch_size > 1:
                # Single value, expand to match batch size
                integer_input = integer_input.expand(batch_size).unsqueeze(1)
            elif integer_input.size(0) == batch_size:
                # Already correct batch size, just add feature dimension
                integer_input = integer_input.unsqueeze(1)
            else:
                # Mismatch - take first element and expand
                integer_input = integer_input[0].unsqueeze(0).expand(batch_size, 1)
        elif integer_input.dim() == 2:
            # Already has correct shape, just verify batch size
            if integer_input.size(0) != batch_size:
                # Take first row and expand
                integer_input = integer_input[0:1].expand(batch_size, -1)

        # print(f"Integer input after processing: {integer_input.shape}")
        # print(f"CNN features shape: {x.shape}")

        # Concatenate CNN features with integer input
        combined_input = torch.cat([x, integer_input.float()], dim=1)

        # Simplified FC processing
        x = F.relu(self.fc1(combined_input))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))

        if self.enable_dueling_dqn:
            # Value stream
            v = F.relu(self.fc_value(x))
            V = self.value(v)

            # Advantage stream
            a = F.relu(self.fc_advantages(x))
            A = self.advantages(a)

            # Calculate Q-values
            Q = V + A - torch.mean(A, dim=1, keepdim=True)
        else:
            # Standard DQN
            x = F.relu(self.fc3(x))
            Q = self.output(x)

        return Q



In [36]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

DATE_FORMAT = "%m-%d %H:%M:%S"

# Directory for saving run info
RUNS_DIR = "runs"
os.makedirs(RUNS_DIR, exist_ok=True)

# 'Agg': used to generate plots as images and save them to a file instead of rendering to screen
matplotlib.use('Agg')

class Agent():
    def __init__(self):
        self.learning_rate_a    = 0.001        # learning rate (alpha)
        self.discount_factor_g  = 0.99     # discount rate (gamma)
        self.network_sync_rate  = 100      # number of steps the agent takes before syncing the policy and target network
        self.replay_memory_size = 100000     # size of replay memory
        self.mini_batch_size    = 64        # size of the training data set sampled from the replay memory
        self.epsilon_init       = 1           # 1 = 100% random actions
        self.epsilon_decay      = 0.9995          # epsilon decay rate
        self.epsilon_min        = 0.01
        self.enable_double_dqn  = True
        self.enable_dueling_dqn = True
        self.stop_on_reward = 10000

        self.loss_fn = nn.MSELoss()          # NN Loss function. MSE=Mean Squared Error can be swapped to something else.
        self.optimizer = None

        self.LOG_FILE   = os.path.join(RUNS_DIR, f'{"MrDino"}.log')
        self.MODEL_FILE = os.path.join(RUNS_DIR, f'{"MrDino"}.pt')
        self.GRAPH_FILE = os.path.join(RUNS_DIR, f'{"MrDino"}.png')

    def run(self, is_training=True, render=False):
        if is_training:
            start_time = datetime.now()
            last_graph_update_time = start_time

            log_message = f"{start_time.strftime(DATE_FORMAT)}: Training starting..."
            print(log_message)
            with open(self.LOG_FILE, 'w') as file:
                file.write(log_message + '\n')
        env =MrDino(render_mode= None)
        num_actions = env.action_space.n
        rewards_per_episode = []

        # Create policy and target network. Number of nodes in the hidden layer can be adjusted.
        policy_dqn = CNN_DQN(83,100,num_actions, 256,self.enable_dueling_dqn).to(device)

        if is_training:
            epsilon = self.epsilon_init

            memory = ReplayMemory(self.replay_memory_size)

            target_dqn = CNN_DQN(83,100,num_actions ,256,self.enable_dueling_dqn).to(device)
            target_dqn.load_state_dict(policy_dqn.state_dict())

            self.optimizer = torch.optim.Adam(policy_dqn.parameters(), lr=self.learning_rate_a)

            epsilon_history = []

            step_count=0

            best_reward = -9999999
        else:
            policy_dqn.load_state_dict(torch.load(self.MODEL_FILE))

            policy_dqn.eval()

        for episode in itertools.count():

            state, _ = env.reset()  # Initialize environment. Reset returns (state,info).
            state = (
                torch.tensor(state[0], dtype=torch.float, device=device),   # binary image
                torch.tensor(state[1], dtype=torch.float, device=device)    # 0 or 1 flag
            )


            terminated = False     
            episode_reward = 0.0    

           
            while(not terminated and episode_reward < self.stop_on_reward):

                # Select action based on epsilon-greedy
                if is_training and random.random() < epsilon:
                    action = env.action_space.sample()
                    action = torch.tensor(action, dtype=torch.int64, device=device)
                else:
                    with torch.no_grad():
                        action = policy_dqn(state[0].unsqueeze(0) if state[0].dim() == 3 else state[0],state[1].unsqueeze(0) if state[1].dim() == 1 else state[1]).argmax()

                # Execute action. Truncated and info is not used.
                new_state,reward,terminated,truncated,info = env.step(action.item())

                # Accumulate rewards
                episode_reward += reward

                # Convert new state and reward to tensors on device
                new_state = (torch.tensor(new_state[0], dtype=torch.float, device=device),   # binary image
                             torch.tensor(new_state[1], dtype=torch.float, device=device) )
                reward = torch.tensor(reward, dtype=torch.float, device=device)

                if is_training:
                    # Save experience into memory
                    memory.append((state, action, new_state, reward, terminated))

                    # Increment step counter
                    step_count+=1

                # Move to the next state
                state = new_state

            # Keep track of the rewards collected per episode.
            rewards_per_episode.append(episode_reward)

            # Save model when new best reward is obtained.
            if is_training:
                if episode_reward > best_reward:
                    log_message = f"{datetime.now().strftime(DATE_FORMAT)}: New best reward {episode_reward:0.1f} ({(episode_reward-best_reward)/best_reward*100:+.1f}%) at episode {episode}, saving model..." if best_reward !=0 else f"{datetime.now().strftime(DATE_FORMAT)}: New best reward {episode_reward:0.1f} (0%) at episode {episode}, saving model..."
                    print(log_message)
                    with open(self.LOG_FILE, 'a') as file:
                        file.write(log_message + '\n')

                    torch.save(policy_dqn.state_dict(), self.MODEL_FILE)
                    best_reward = episode_reward


                # Update graph every x seconds
                current_time = datetime.now()
                if current_time - last_graph_update_time > timedelta(seconds=10):
                    self.save_graph(rewards_per_episode, epsilon_history)
                    last_graph_update_time = current_time

                # If enough experience has been collected
                if len(memory)>self.mini_batch_size:
                    mini_batch = memory.sample(self.mini_batch_size)
                    self.optimize(mini_batch, policy_dqn, target_dqn)

                    # Decay epsilon
                    epsilon = max(epsilon * self.epsilon_decay, self.epsilon_min)
                    epsilon_history.append(epsilon)

                    # Copy policy network to target network after a certain number of steps
                    if step_count > self.network_sync_rate:
                        target_dqn.load_state_dict(policy_dqn.state_dict())
                        step_count=0

    def save_graph(self, rewards_per_episode, epsilon_history):
        # Save plots
        fig = plt.figure(1)

        # Plot average rewards (Y-axis) vs episodes (X-axis)
        mean_rewards = np.zeros(len(rewards_per_episode))
        for x in range(len(mean_rewards)):
            mean_rewards[x] = np.mean(rewards_per_episode[max(0, x-99):(x+1)])
        plt.subplot(121) # plot on a 1 row x 2 col grid, at cell 1
        # plt.xlabel('Episodes')
        plt.ylabel('Mean Rewards')
        plt.plot(mean_rewards)

        # Plot epsilon decay (Y-axis) vs episodes (X-axis)
        plt.subplot(122) # plot on a 1 row x 2 col grid, at cell 2
        # plt.xlabel('Time Steps')
        plt.ylabel('Epsilon Decay')
        plt.plot(epsilon_history)

        plt.subplots_adjust(wspace=1.0, hspace=1.0)

        # Save plots
        fig.savefig(self.GRAPH_FILE)
        plt.close(fig)

    def optimize(self, mini_batch, policy_dqn, target_dqn):

        states, actions, new_states, rewards, terminations = zip(*mini_batch)

        # Unzip image and flag parts from state and new_state
        state_imgs, state_flags = zip(*states)
        new_state_imgs, new_state_flags = zip(*new_states)

        # Stack each part separately
        state_imgs = torch.stack(state_imgs)         # shape [B, C, H, W] or [B, H, W]
        state_flags = torch.stack(state_flags)       # shape [B] or [B, 1]

        new_state_imgs = torch.stack(new_state_imgs)
        new_state_flags = torch.stack(new_state_flags)

        actions = torch.stack(actions)

        rewards = torch.stack(rewards)
        terminations = torch.tensor(terminations).float().to(device)

        with torch.no_grad():
            if self.enable_double_dqn:
                best_actions_from_policy = policy_dqn(new_state_imgs,new_state_flags).argmax(dim=1)

                target_q = rewards + (1-terminations) * self.discount_factor_g * \
                                target_dqn(new_state_imgs,new_state_flags).gather(dim=1, index=best_actions_from_policy.unsqueeze(dim=1)).squeeze()
            else:
                # Calculate target Q values (expected returns)
                target_q = rewards + (1-terminations) * self.discount_factor_g * target_dqn(new_states).max(dim=1)[0]
            #    (1-terminal) is done becuase if the event is terminated future Q-value is ignored

        # Calcuate Q values from current policy
        current_q = policy_dqn(state_imgs, state_flags).gather(dim=1, index=actions.unsqueeze(dim=1)).squeeze()


        # Compute loss
        loss = self.loss_fn(current_q, target_q)

        # Optimize the model (backpropagation)
        self.optimizer.zero_grad()  # Clear gradients
        loss.backward()             # Compute gradients
        self.optimizer.step()



In [37]:
dql = Agent()

dql.run(is_training=False)


KeyboardInterrupt: 