# CS5756 Final Project: Safe Reinforcement Learning with Behavioral Cloning

## Setup

In [18]:
# Set Up:
import sys

import numpy as np
import gymnasium as gym
import random
import matplotlib.pyplot as plt
from copy import deepcopy

from torch.utils.data import DataLoader
from torch import nn
import torch
import cv2
from tqdm import tqdm, trange

seed = 24
data_seed = 700


In [19]:
# Set seeds
# Setting the seed to ensure reproducability
def reseed(seed, env=None):
    '''
        Sets the seed for reproducibility 

        When @param env is provided, also sets the 
        random number generataor of the gym environment 
        to this particular seed
    '''
    torch.manual_seed(seed)
    random.seed(seed)
    np.random.seed(seed)
    
    if env is not None: 
        env.unwrapped._np_random = gym.utils.seeding.np_random(seed)[0]

reseed(seed)

In [10]:
# Visualize
def visualize(env_name='LunarLander-v2', algorithm=None, video_name="test", env_args={}):
    """Visualize a policy network for a given algorithm on a single episode

        Args:
            env_name: Name of the gym environment to roll out `algorithm` in, it will be instantiated using gym.make or make_vec_env
            algorithm (PPOActor): Actor whose policy network will be rolled out for the episode. If
            no algorithm is passed in, a random policy will be visualized.
            video_name (str): Name for the mp4 file of the episode that will be saved (omit .mp4). Only used
            when running on local machine.
    """

    def get_action(obs):
        if not algorithm:
            return env.action_space.sample()
        else:
            return algorithm.select_action(obs)

    video = cv2.VideoWriter(f"{video_name}.mp4", cv2.VideoWriter_fourcc(*'mp4v'), 24, (600,400))

    env_args['render_mode'] = 'rgb_array'
    env = gym.make(env_name, **env_args)
    obs, info = env.reset()

    for i in range(500):
        action = get_action(obs)
        res = env.step(action)
        obs, reward, done, truncate, info = res

        if done:
            break

        im = env.render()
        im = im[:,:,::-1]

        video.write(im)

    video.release()
    env.close()
    print(f"Video saved as {video_name}.mp4")

In [11]:
# Evaluate Policy
def evaluate_policy(actor, environment, num_episodes=100, progress=True): 
    '''
        Returns the mean trajectory reward of rolling out `actor` on `environment 

        Parameters 
        - actor: PPOActor instance, defined in Part 1 
        - environment: classstable_baselines3.common.vec_env.VecEnv instance 
        - num_episodes: total number of trajectories to collect and average over
    '''
    total_rew = 0 

    iterate = (trange(num_episodes) if progress else range(num_episodes))
    for _ in iterate: 
        obs = environment.reset() 
        done = False

        while not done: 
            action = actor.select_action(obs)
            
            next_obs, reward, done, info = environment.step(action) 
            total_rew += reward
            
            obs = next_obs 
            # done = done.any() if isinstance(done, np.ndarray) else done
    return (total_rew / num_episodes).item() 
        

In [17]:
visualize(env_name='CarRacing-v2')

Video saved as test.mp4
