In [1]:
import cv2
import numpy as np
import time
import mss
import mss.tools
from gymnasium import Env
from gymnasium import spaces
import numpy as np
import matplotlib.pyplot as plt
from paddleocr import PaddleOCR
import pydirectinput
import pygetwindow as gw
import ctypes
import logging
import os
import gymnasium

In [2]:
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'

In [3]:
logging.getLogger('ppocr').setLevel(logging.CRITICAL)

In [4]:


class FlappyBirdEnv(Env):
    def __init__(self,monitor_index=1):
        super().__init__()
        self.observation_space = spaces.Box(low=0, high=255, shape=(1,144,192) , dtype=np.uint8)
        self.action_space = spaces.Discrete(2)  # 0: No Jump, 1: Jump
        
        #self.step_count=1
        self.cnt=1
        self.cap = mss.mss()
        self.monitor = self.cap.monitors[monitor_index]
        self.roi = {'top': 150, 'left': 820, 'width': 960, 'height': 720}
        self.done_location = {'top':170, 'left':750, 'width':430, 'height':110}
        self.ocr = PaddleOCR(use_angle_cls=True, lang='en')

        self.screen_top_boundary = 150
        self.screen_bottom_boundary = 720
    
    def capture_screen(self):
        screenshot = self.cap.grab(self.roi)
        frame = np.array(screenshot)
        return frame
    
    def capture_done(self):
        screenshot = self.cap.grab(self.done_location)
        frame = np.array(screenshot)
        return frame

    def detect_contours(self, frame):
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        _, thresh = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY_INV)
        contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        return thresh, contours

    def reset(self, seed=None, return_info=False, options=None):
        super().reset(seed=seed)

        if self.cnt==1:
            game_window = gw.getWindowsWithTitle('Flappy Bird')[0]
            ctypes.windll.user32.SetForegroundWindow(game_window._hWnd)
            self.cnt = self.cnt+1
        
        info={}

        time.sleep(1.5)
        x, y = 970, 805
        # Move the mouse to the coordinates and click
        pydirectinput.moveTo(x, y)
        pydirectinput.click()
        #pydirectinput.press('space')
        return self.get_observation(),info
    
    def step(self, action):
        
            # Action key -> 0=noJummp , 1=Jump
        action_map = {
            0: 'no_jump',
            1: 'd'#jump
        }
        if action != 0:
            pydirectinput.press(action_map[action])

        truncated = False
        done , done_cap = self.get_done() # check if done
        new_observation =  self.get_observation() #get new observ
        #Reward
        reward = 1
        frame = self.capture_screen()
        _, contours = self.detect_contours(frame)
        bird_position = self.get_bird_position(contours)
        if bird_position is not None:
            _, bird_y, _, _ = bird_position
            if bird_y < self.screen_top_boundary or bird_y > self.screen_bottom_boundary:
                reward -= 2

        info={}#.....streambaseline needs this      
        return new_observation, reward, done, truncated, info

    def get_bird_position(self, contours):
        # Method to find the bird based on size and position
        for contour in contours:
            x, y, w, h = cv2.boundingRect(contour)
            # Check if the contour matches the bird's size and x-coordinate range
            if w <= 100 and h <= 100 and 50 <= x <= 250:
                # Draw a green rectangle around the detected bird
                 #cv2.rectangle(self.frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
                # Return the bird's position (you can return other values if needed)
                return (x, y, w, h)
        # Return None if no bird is found
        return None

    def render(self):
        #plt.imshow('Game', np.array(self.get_observation()))
        pass

    def close(self):
        #cv2.destroyAllWindows()
        pass
    
    def get_observation(self):
        screen = self.capture_screen()
        gray = cv2.cvtColor(screen, cv2.COLOR_BGR2GRAY)  # Convert to grayscale
        screen_resized = cv2.resize(gray, (192, 144))  # Resize to (58, 48)
        _, thresh = cv2.threshold(screen_resized, 160, 255, cv2.THRESH_BINARY_INV)  # Thresholding
        contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        mask = np.ones_like(screen_resized) * 255  # Create mask
        cv2.drawContours(mask, contours, -1, (0), thickness=cv2.FILLED)  # Draw contours
        channel = np.expand_dims(mask, axis=0)  # Add channel dimension
        return channel



    def get_done(self):
        done_cap = self.capture_done() 
        target_bgr = [82, 159, 250]  # The BGR values you want to check
        target_coords_1 = (50, 50)   # Coordinate 1 for color check
        target_coords_2 = (350, 50)  # Coordinate 2 for color check
        time.sleep(0.1)
        pixel_value_1 = done_cap[target_coords_1[1], target_coords_1[0]]
        pixel_value_2 = done_cap[target_coords_2[1], target_coords_2[0]]
        
        done = all(pixel_value_1[:3] == target_bgr) and all(pixel_value_2[:3] == target_bgr)

        return done, done_cap






In [5]:
env = FlappyBirdEnv()

In [12]:
for episode in range(10): 
    obs = env.reset()
    done = False  
    total_reward   = 0
    while not done:  
        obs, reward,  done, truncated,info =  env.step(env.action_space.sample())
        total_reward  += reward
    print('Total Reward for episode {} is {}'.format(episode+1, total_reward))  

Total Reward for episode 1 is 0
Total Reward for episode 2 is 11
Total Reward for episode 3 is 0
Total Reward for episode 4 is 11
Total Reward for episode 5 is 6
Total Reward for episode 6 is 1
Total Reward for episode 7 is 2
Total Reward for episode 8 is 2
Total Reward for episode 9 is 2
Total Reward for episode 10 is 2


# Model


In [6]:
import torch

print("CUDA Available:", torch.cuda.is_available())
print("CUDA Version:", torch.version.cuda)
print("Device Name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU detected")

CUDA Available: True
CUDA Version: 11.8
Device Name: NVIDIA GeForce GTX 1650


In [7]:
from stable_baselines3.common.callbacks import BaseCallback

In [8]:

from stable_baselines3.common import env_checker

In [9]:
env_checker.check_env(env)

In [10]:
class TrainAndLoggingCallback(BaseCallback):

    def __init__(self, check_freq, save_path, verbose=1):
        super(TrainAndLoggingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.save_path = save_path

    def _init_callback(self):
        if self.save_path is not None:
            os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self):
        if self.n_calls % self.check_freq == 0:
            model_path = os.path.join(self.save_path, 'best_model_{}'.format(self.n_calls))
            self.model.save(model_path)

        return True
    
CHECKPOINT_DIR = './train/'
LOG_DIR = './logs/'
 

In [11]:

callback = TrainAndLoggingCallback(check_freq=1000, save_path=CHECKPOINT_DIR)

In [12]:
from stable_baselines3 import DQN
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack



In [13]:
env = FlappyBirdEnv()

In [14]:
model = DQN('CnnPolicy', env,device = 'cuda', tensorboard_log=LOG_DIR, verbose=1, buffer_size=60000, learning_starts=1500)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


# Learning


In [None]:
model.learn(total_timesteps=80000, callback=callback)

# Testingggg


In [None]:
obs = env.reset()
obs[0].shape
obs = np.expand_dims(obs[0], axis=0)  # This adds a batch dimension, so shape becomes (1, 1, 48, 58)

In [None]:



# Predict the action
action, _ = model.predict(obs)
action
model.predict(obs[0])

In [15]:
model = DQN.load("train/best_model_52000", env=env, device='cuda')

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [16]:
print(model.replay_buffer.size())


0


In [17]:
for episode in range(5): 
    obs = env.reset()
    obs = np.expand_dims(obs[0], axis=0)
    done = False
    total_reward = 0
    while not done: 
        action, _ = model.predict(obs)
        obs, reward, done, truncated, info = env.step(int(action))
        #time.sleep(0.01)
        total_reward += reward
    print('Total Reward for episode {} is {}'.format(episode+1, total_reward))


Total Reward for episode 1 is 2
Total Reward for episode 2 is 25
Total Reward for episode 3 is 37
Total Reward for episode 4 is 56
Total Reward for episode 5 is 176
