In [1]:
import os
from math import sin, cos, pi, sqrt
from random import randrange

import numpy as np
import gym
from gym import spaces

import pygame
from pygame.locals import *

pygame 2.5.1 (SDL 2.28.2, Python 3.9.13)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
class droneEnv(gym.Env):
    def __init__(self, render_frame):
        super(droneEnv, self).__init__()
        self.render_frame = render_frame

        pygame.init()
        self.Frame_per_second = pygame.time.Clock()
        self.display = pygame.display.set_mode((900, 900))
        
        player_width = 80
        player_length = 24
        self.player = []
        self.player_animation_speed = 0.3
        for i in range(1, 5):
            image = pygame.image.load(os.path.join("assets/balloon-flat-asset-pack/png/objects/drone-sprites/drone-"
            + str(i)
            + ".png"))
            image.convert()
            self.player.append(pygame.transform.scale(image, (player_width, int(player_width * 0.30))))
            
        target_width = 30
        target_length = 30
        target_animation_speed = 0.1
        #self.target = []
        #for i in range(1, 8):
        #    image = pygame.image.load(os.path.join(
        #    "assets/balloon-flat-asset-pack/png/balloon-sprites/red-plain/red-plain-"
        #    + str(i)
        #    + ".png"))
        #    image.convert()
        #    self.target.append(pygame.transform.scale(image, (target_width, int(target_width * 1.73))))
        self.target = pygame.image.load(os.path.join("assets/sprites/target_old.png"))
        self.target.convert()
        
        pygame.font.init()
        self.myfont = pygame.font.SysFont("Comic Sans MS", 20)
        
        # coordinates and constraints
        (self.a, self.ad, self.add, self.x, self.xd, self.xdd, self.y, self.yd, self.ydd) = (0, 0, 0, 400, 0, 0, 400, 0, 0)
        self.xt = randrange(200, 600)
        self.yt = randrange(200, 600)
        self.FPS = 60
        self.gravity = 0.08
        self.thruster_amplitude = 0.04
        self.diff_amplitude = 0.0006
        self.thruster_mean = 0.04
        self.mass = 1
        self.arm = 25
        self.target_counter = 0
        self.reward = 0
        self.time = 0
        self.time_limit = 30
        self.stepno = 0;
        

        self.action_space = gym.spaces.Discrete(5)
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(7,))
        
    def step(self, action):
        self.reward = 0.0
            
        for i in range(5):
            self.stepno += 1
            self.time += 1/60
            self.xdd = 0
            self.ydd = self.gravity
            self.add = 0
            thruster_left = self.thruster_mean
            thruster_right = self.thruster_mean
            if int(action) == 0:
                pass
            elif int(action) == 1:
                thruster_left += self.thruster_amplitude
                thruster_right += self.thruster_amplitude
            elif int(action) == 2:
                thruster_left -= self.thruster_amplitude
                thruster_right -= self.thruster_amplitude
            elif int(action) == 3:
                thruster_left += self.diff_amplitude
                thruster_right -= self.diff_amplitude
            elif int(action) == 4:
                thruster_left -= self.diff_amplitude
                thruster_right += self.diff_amplitude
                    
            self.xdd += (
                -(thruster_left + thruster_right) * sin(self.a * pi / 180) / self.mass
            )
            self.ydd += (
                -(thruster_left + thruster_right) * cos(self.a * pi / 180) / self.mass
            )
            self.add += self.arm * (thruster_right - thruster_left) / self.mass

            self.xd += self.xdd
            self.yd += self.ydd
            self.ad += self.add
            self.x += self.xd
            self.y += self.yd
            self.a += self.ad    
            
            dist = sqrt((self.x - self.xt) ** 2 + (self.y - self.yt) ** 2)

            self.reward += 1 / 60
            self.reward -= dist / (100 * 60)

            if dist < 50:
                self.xt = randrange(200, 600)
                self.yt = randrange(200, 600)
                self.reward += 100
                self.target_counter += 1

            if self.time > self.time_limit:
                done = True
                break

            elif dist > 1000:
                self.reward -= 1000
                done = True
                break

            else:
                done = False

            if self.render_frame is True:
                self.render(i)
            
        info = {}    
        return (
            self.obs(),
            self.reward,
            done,
            info,
        )
    
    def reset(self):
        (self.a, self.ad, self.add, self.x, self.xd, self.xdd, self.y, self.yd, self.ydd) = (0, 0, 0, 400, 0, 0, 400, 0, 0)
        
        self.xt = randrange(200, 600)
        self.yt = randrange(200, 600)

        self.target_counter = 0
        self.reward = 0
        self.time = 0
        self.stepno = 0
        return self.obs()
    
    def obs(self) -> np.ndarray:
        
        velocity = sqrt(self.xd**2 + self.yd**2)
        dist_target = (
            sqrt((self.xt - self.x) ** 2 + (self.yt - self.y) ** 2) / 500
        )
        angle_withtarget = np.arctan2(self.yt - self.y, self.xt - self.x)
        angle_target_and_velocity = np.arctan2(self.yt - self.y, self.xt - self.x) - np.arctan2(self.yd, self.xd)
        distance_to_target = (sqrt((self.xt - self.x) ** 2 + (self.yt - self.y) ** 2) / 500)
        return np.array(
            [
                self.a / 180 * pi,
                velocity,
                self.ad,
                dist_target,
                angle_withtarget,
                angle_target_and_velocity,
                distance_to_target,
            ]
        ).astype(np.float32)
            
        
    def render(self, i):
        # Pygame rendering
        pygame.event.get()
        self.display.fill((173, 220, 255))
        #self.target = pygame.transform.scale(self.target, (target_width, int(player_width * 0.30)))
        self.display.blit(
            self.target,
            (
                self.xt - int(self.target.get_width() / 2),
                self.yt - int(self.target.get_height() / 2),
            ),
        )
        player_sprite = self.player[int(self.stepno * self.player_animation_speed) % len(self.player)]
        player_copy = pygame.transform.rotate(player_sprite, self.a)
        #player_copy = pygame.transform.scale(player_copy, (player_width, int(player_width * 0.30)))
        self.display.blit(
            player_copy,
            (
                self.x - int(player_copy.get_width() / 2),
                self.y - int(player_copy.get_height() / 2),
            ),
        )

        textsurface = self.myfont.render(
            "Collected: " + str(self.target_counter), False, (255, 255, 255)
        )
        self.display.blit(textsurface, (20, 20))
        textsurface3 = self.myfont.render(
            "Time left: " + str(int(self.time_limit - self.time)), False, (255, 255, 255)
        )
        self.display.blit(textsurface3, (20, 50))

        pygame.display.update()
        self.Frame_per_second.tick(self.FPS)

    def close(self):
        pass        
    

In [3]:
import os
import gym
import numpy as np
import torch as th
from stable_baselines3 import DQN

MODEL_PATH = "models/DQN_v2/model"

# Create and wrap the environment
env = droneEnv(True)

# Load the trained agent
model = DQN.load(MODEL_PATH, env=env)

# Evaluate the agent
for i in range(5):
    obs = env.reset()
    done = False
    episode_reward = 0
    while not done:
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, done, info = env.step(action)
        episode_reward += reward
    print("Episode reward", episode_reward)
    env.render(i)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Episode reward -334.2516901752995
Episode reward 959.8442203445431
Episode reward 1048.5096992535637
Episode reward 855.0100192623484
Episode reward 1064.9729997738489


In [1]:
###### training

from stable_baselines3 import DQN
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.callbacks import CheckpointCallback
import wandb
from wandb.integration.sb3 import WandbCallback


run = wandb.init(
    project="quadai",
    sync_tensorboard=True,
    monitor_gym=True,
)

log_dir = "tmp/"
os.makedirs(log_dir, exist_ok=True)


env = droneEnv(False)
env = Monitor(env, log_dir)


model = DQN("MlpPolicy", env, verbose=1, tensorboard_log=log_dir)


checkpoint_callback = CheckpointCallback(
    save_freq=100000, save_path=log_dir, name_prefix="rl_model_v0"
)

model.learn(
    total_timesteps=10000000,
    callback=[
        checkpoint_callback,
        WandbCallback(
            gradient_save_freq=100000,
            model_save_path=f"models/{run.id}",
            model_save_freq=100000,
            verbose=2,
        ),
    ],
)
env.close()
run.finish()

[34m[1mwandb[0m: Currently logged in as: [33msamyak_j[0m ([33msolorn[0m). Use [1m`wandb login --relogin`[0m to force relogin


NameError: name 'os' is not defined