# OpenAI Gym: CSPB-3202 Introduction to Artificial Intelligence

### Final Semester Project

### Author: Micah Simmerman 

### Date: December 07, 2023

This end-of-semester project will explore agent and environmental design for Reinforcement Learning (RL) design using the pythonic OpenAI Gym library suite of tools.  This project and its' accompanying presentation(s) will seek to explore the finer-point details of RL agents and reinforcement learning environment design. We will first build models with limited degree-of-freedom (DOF) complexity, and then gradually explore the design considerations of more complicated agents and environments. 

Please read all inline documentation as sources will often be cited there. 

Project Itinerary:

    1.) Cartpole: starting example
    2.) Lunar Lander
    3.) Taxi Cab
    4.) HalfCheetah

### Import Gym, Gymnasium, IPython display (in place of Box2D),

In [1]:
import gymnasium as gym
# import gym
from IPython import display  # alternative to Box2D
# from gym import Box2D

import warnings
from collections import namedtuple
warnings.filterwarnings("ignore", category=DeprecationWarning)
partition = namedtuple("partition", ["type", "subtype"])
gym.__version__
e_cartpole = gym.make('CartPole-v1')   # coinstruct the environment 'e'
print("result of gym.make('CartPole-v1'); ", e_cartpole)  # shows the environment wrapper structure.
obs = e_cartpole.reset()  # reset cartpole environment

print("Environment action space: ", e_cartpole.action_space) # discrete action choices: [left, right]
print("~~~~~~~~~~~~~~~~~~~~~~~~")
print("Observed transitions: ", obs)

# The action space is defined in gym.spaces
from gym.spaces.discrete import Discrete
d = Discrete(2) #the Discrete is a class that has methods .sample and .contains
[d.sample() for x in range(10)] #sample generates an action output

print(d.contains(0), d.contains(2)) #with .contains method, you can check whether an integer is a valid action
print(e_cartpole.observation_space) #returns Box class, which represents n-dim tensor
e_cartpole.step(1)
print()

e_cartpole.step(0) 
# see https://github.com/openai/gym/blob/master/gym/envs/classic_control/cartpole.py
# force = self.force_mag if action==1 else -self.force_mag

FileNotFoundError: Could not find module 'C:\Users\jmica\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\atari_py\ale_interface\ale_c.dll' (or one of its dependencies). Try using the full path with constructor syntax.

### Import Packages and Get Ready to use Torch

In [None]:
import numpy as np   # for complex calculations
import matplotlib.pyplot as plt   # for graphs and plots
from tqdm import tqdm  # provides progress bar functionality
from copy import deepcopy  # enables deep and shallow copying of python class objects

from gym.wrappers import RecordVideo  # 
from utils import *

import torch 
from torch import nn
import torch.nn.functional as F
from torch import optim 


%matplotlib inline

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Hardware used: ", device)
# torch.cuda.is_available()  # NEED TO INSTALL CUDA DRIVERS TO ABSTRACT THE GPU

AttributeError: partially initialized module 'gym' has no attribute 'core' (most likely due to a circular import)

The following cells produce an agent that can navigate the different types of OpenAI Gym environments that we will be creating in this notebook. We can also use any of the approaches shown below to complete the assignment.

## Making an agent
Since the `gym` provides us the environment, it's our job to make an agent (policy) that can interact with the environemnt. Here is an example of a random agent.

In [None]:
import gymnasium as gym
# for more information about OpenAI Gym Wrappers visit: https://gymnasium.farama.org/api/wrappers/observation_wrappers/

env = gym.make("CartPole-v1")  # creates a cartpole v1 gym env class object

total_reward = 0.0  # initiate total rewards var
total_steps = 0  # initiate step counter
obs = env.reset()  # reset the env to an initial internal state, returns observation and info. 

while True:  # continuous while loop
    action = env.action_space.sample() #.sample method gives a random action sample
    obs, reward, done, _, _ = env.step(action)  # "update the environment with actions returning the next agent observation" - google
    total_reward += reward  # increment the reward acc var by the earned amount. 
    total_steps += 1  # increment step counter.
    if done:  # not sure where "done" gets set and updated. 
        break  # discontinue while loop.

print("Episode done in %d steps, total reward %.2f" % (total_steps, total_reward))

## Monitoring the agent

Sources: 

    https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.imshow.html
    https://matplotlib.org/3.1.1/api/_as_gen/matplotlib.pyplot.clf.html
    https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.figure.html
    https://matplotlib.org/3.1.1/api/_as_gen/matplotlib.pyplot.title.html
    https://matplotlib.org/stable/api/_as_gen/matplotlib.pyplot.axis.html

The following cell defines a custom function that allows us to monitor the agent's gameplay. The cell also invokes matplotlib and IPython libraries used to render the display.

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
from IPython import display  # IMPORTANT!: this may be a good alternative to the failed "BOx2D" library install.

# The "show_state" function is used to render visual information about the agent's interaction with the environment
# 'show_state' is a user defined function that controls the visual rendering of the 2D Gym environment plus agent
def show_state(env, step=0, info=""):  # user defined function handling environment, step count initialization (?), empty "info" acc string variable.
    plt.figure(3)  # "num" is a int, str, Figure, or SubFigure and an optional variable for the plt library. "figure" creates a new figure, oc.
    plt.clf()  # clears the current figure.
    plt.imshow(env.render())  # Displays data (the agents given environment, in this case) as an image on a regular 2D roster.
    plt.title("%s | Step: %d %s" % (env.spec.id,step, info))  # title(label, fontdict=None, loc='center', pad=None, **kwargs)
    plt.axis('off')  # A "Convenience method to get or set some axis properties." Example: xmin, xmax, ymin, ymax = axis()

    display.clear_output(wait=True)  # removes current output of the cell, enables collection of the latest details
    display.display(plt.gcf())  # plt.gcf collects the current figure. If there is no current figure on the pyplot figure stack, a new (blank one) is created using figure()

In [None]:
# We follow a similar pattern of each time: 
# environment creation => initiate 'reward_sum' and 'step_count' => (re)set the environment => gaurded while loop => display render
# 
env = gym.make("CartPole-v1", render_mode='rgb_array')  # generate the gym environment by invoking the .make class function
total_reward = 0.0  # initiate reward sum
total_steps = 0  # 
obs = env.reset()  # 

while True:  # initiate an infinity while loop controlled by agent decisions and resulting game outcomes.
    action = env.action_space.sample()
    obs, reward, done, _, _ = env.step(action)
    total_reward += reward
    total_steps += 1
    show_state(env,total_steps)
    if done:  # done == True when Gym game environment reaches a terminating state
        break  # then we break out of the loop

print("Episode done in %d steps, total reward %.2f" % (total_steps, total_reward))
env.close()
env.env.close()

## Environment wrappers
`Env` offers a `Wrapper` class that allow you to modify settings in the `Env` class. `Wrapper` has three subclasses `RewardWrapper`, `ObservationWrapper` and `ActionWrapper`.
Let's take an example of an agent that takes a random action e.g. 10% of the time. We'll write a class that overrides `Env`'s action usging `ActionWrapper`. 

In [None]:
import gymnasium as gym
import random

# From doc source: 
# "If you would like to apply a function to the action before passing it to the base environment, you can simply 
# inherit from ActionWrapper and overwrite the method action() to implement that transformation."
# RandomEpsilonWrapper creates a random agent for the cartpole environment that inherits from the gym.ActionWrapper base class. 
class RandomEpsilonWrapper(gym.ActionWrapper):  # gym.ActionWrapper is a Gym Base Class that can modify the action before env.step()
    def __init__(self, env, epsilon=0.1):  # initiates the RandomEpsilonWrapper inheritence object
        super(RandomEpsilonWrapper, self).__init__(env)  # RandomEpsilonWrapper is an epsilon-greedy method
        self.epsilon = epsilon  # set epsilon self value


    def action(self, action):  # overrides base class definition, takes action and acts randomly according to self.epsilon
        if random.random() < self.epsilon:  # make a random stochastic selection and compare it greedily to reduce epsilon
            print("Random!")  # disregard the policy-informed action in exchange for a randomly-selected action 
            return self.env.action_space.sample()  # agent has "chosen" to act randomly (controlled by epsilon) (exploration)
        else:
            print("Policy")  # agent made a decision informed by the current policy (exploitation)
        return action

In [None]:
env = RandomEpsilonWrapper(gym.make("CartPole-v1"),epsilon=0.5)  # modify the environment with RandomEpsilonWrapper
obs = env.reset()  # 
total_reward = 0.0  # 

while True:
    obs, reward, done, _, _= env.step(0) #we have a fixed policy of going to left (action = 0) only
    total_reward += reward
    #show_state(env) #with this line uncommented, you can also monitor
    if done:
        break

print("Reward got: %.2f" % total_reward)

## Implementing simple  reflex agent
Below is the rule defined in the [Cartpole environment](https://github.com/openai/gym/blob/master/gym/envs/classic_control/cartpole.py)
```
   Observation: 
        Type: Box(4)
        Num	Observation                 Min         Max
        0	Cart Position             -4.8            4.8
        1	Cart Velocity             -Inf            Inf
        2	Pole Angle                 -24 deg        24 deg
        3	Pole Velocity At Tip      -Inf            Inf
        
    Actions:
        Type: Discrete(2)
        Num	Action
        0	Push cart to the left
        1	Push cart to the right
        
        Note: The amount the velocity that is reduced or increased is not fixed; it depends on the angle the pole is pointing. This is because the center of gravity of the pole increases the amount of energy needed to move the cart underneath it
    Reward:
        Reward is 1 for every step taken, including the termination step
    Starting State:
        All observations are assigned a uniform random value in [-0.05..0.05]
    Episode Termination:
        Pole Angle is more than 12 degrees
        Cart Position is more than 2.4 (center of the cart reaches the edge of the display)
        Episode length is greater than 200
        Solved Requirements
        Considered solved when the average reward is greater than or equal to 195.0 over 100 consecutive trials.
```

Let's make an agent that act deterministically. Let's say the agent can sense the velocity of the cart vc and the velocity of the pole vp. It will continue going to the same direction until it senses the pole starts falling toward the oposite direction of it moving (Simple reflex agent).

Here, we create an Agent class first, which runs single episode and can run episodes n times so that we can do some stats. Then we'll create subclasses using inheritance (below examples do not use `super` as we used in the wrapper example above)

In [5]:
# deterministic rule
# first choose random action
# monitor vc and vp while keep going to the same direction
# if the sign of the two velocities are different, flip the direction (action)
import gymnasium as gym
# import gym
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

class Agent:  # Agent base class
    def __init__(self):  # agent constructor
        self.env = gym.make("CartPole-v1")  # overwritten by the env handed to the agent 
        self.state = self.env.reset()  #
        self.rewards = []  # a list of rewards
        self.steps = []  # a sequence of chosen steps
        # self.poilcy = []  # list of policy-defined actions at each state (Note that "policy" appears misspelled here, look out for the invocation)
        self.policy = []
        
    def select_action(self, state, action):  # returns policy defined action (a') for each state and action pair (s,a). 
        return action

    def play_episode(self, env):  # contains the while loop that drives the agent through the environment
        self.env = env  # overwrite the __init__ cartpole default environment
        total_reward = 0.0
        state = env.reset()
        action = random.choice([0,1])  # action list depends on the selected environment
        steps = 0
        actions = []
        isInit = 1
        while True:
            if isInit:  # set to '1' above
                state = state[0]  # start at the first state
                isInit = 0  # considered initialized after the first step
            action = self.select_action(state, action)
            new_state, reward, is_done, _, _ = env.step(action)  # collect new_state, reward, is_done status
            total_reward += reward  # 
            steps +=1
            actions.append(action)  # record the action taken
            if is_done:  # 
                break
            state = new_state  # updates Agent's self.state member value
        return total_reward, steps, actions  # return the final gameplay outcome
    
    def repeat(self,n_sample):
        rewards =[]  # a new set of 'rewards', 'steps', and 'policy' lists is generated for every single round of gameplay 
        steps=[]
        policy = []
        for i in range(n_sample):  # forms an effective outer loop by iterating through 'n_sample' runs of agent sgameplay
            reward, step, actions = self.play_episode(self.env)  # drive the game_play using the member function above
            rewards.append(reward)  # appending each item to the list(s)
            steps.append(step)
            policy.append(actions)
        self.rewards = rewards  # update the agent's rewards list
        self.steps = steps
        # self.policy = np.array(policy)  # commented out in the original example, turns policy list into an np.array
        self.policy = policy
        

# WE CAN DEFINE DIFFERENT AGENT TYPES AS SHOWN BELOW
class ReflexAgent(Agent):  # the user-defined class ReflexAgent, extends Agent
    def select_action(self, state, action):  
        if state[1]*state[3]<0:
            action = int(1-action)  
        return action    


AttributeError: partially initialized module 'gym' has no attribute 'core' (most likely due to a circular import)

: 

In [None]:
# 
def replay(policy):
    env = gym.make("CartPole-v1", render_mode='rgb_array')
    obs = env.reset()
    step = 0
    for a in policy:
        env.step(a)
        show_state(env, step)
        step+=1

In [None]:
ra = ReflexAgent()
ra.repeat(1000)
rewards = ra.rewards
plt.hist(rewards,bins=50)
print(np.mean(rewards), np.std(rewards), max(rewards))

In [None]:
class RandomAgent(Agent):       
    def select_action(self,state,action):  
        return random.choice([0,1]) 

In [None]:
rda= RandomAgent()
rda.repeat(1000)
rewards1 = rda.rewards
plt.hist(rewards1,bins=50)
print(np.mean(rewards1), np.std(rewards1), max(rewards1))

In [None]:
# best_reflex = ra.policy[np.argmax(rewards)]
# replay(best_reflex) 
# best_random = rda.policy[np.argmax(rewards1)]
# replay(best_random) 

# # Is this a bug?? It didn't terminate; 
# # it's because everytime the environment is reset, the initial state is different that the policy is not the best for that episode.
# # To make it a proper reply you'll need to also save the initial state, or return the entire env object to reproduce the result.
# # The goal of your algorithm is that no matter which initial state it started with, it behaves optimally.

cart_pole_agent = Agent()  # create an agent class object and extend it with any of the associated class options

## Now let's create special Agent subclasses (extending from the Agent class) to perform in a variety of Gym environments.

### Part I: Atari - Assault

Invoke and Define the Atari Gym "Assault" Environment

In [None]:
# DEFINE THE ENVIRONMENT
# Gym documentation: https://www.gymlibrary.dev/environments/atari/assault/

# Action Space: Discrete(18)
# Observation Space: (210, 160, 3)
# Observation High: 255
# Observation Low: 0
# Import: gym.make("ALE/Assault-v5")
# objective: destroy enemies and maximize the agent's time in the game

# Actions: [0:NOOP, 1:FIRE, 2:UP, 3:RIGHT, 4:LEFT, 5:RIGHTFIRE, 6:LEFTFIRE]
# Observations: Box([0 ... 0], [255 ... 255], (128,), uint8)   # 128 bytes of RAM cionsole space
#               Box([[0 ... 0] ... [0  ... 0]], [[255 ... 255] ... [255  ... 255]], (250, 160), uint8)   # grayscale image

# IMPORT THE NECESSARY ATARI LIBRARIES
import ale_py
import shimmy
import gymnasium as gym

from ale_py import ALEInterface
ale = ALEInterface()
# env = gym.make("ALE/Assault-v5", render_mode="rgb_array")  # test run - PASSES MUSTER

In [None]:
# EXTEND THE AGENT CLASS TO DEFINE A SUCCESSFUL AGENT
# # https://www.youtube.com/watch?v=fnVIgAGhA08 (DQN Algorithm Refresher)
# https://www.tensorflow.org/agents/tutorials/1_dqn_tutorial  (TensorFlow Agent Tutorial)


class AtariAssaultAgent:  #
    def __init__(self):  # agent constructor
        self.env = gym.make("ALE/Assault-v5", render_mode="rgb_array")  # overwritten by the env handed to the agent 
        self.state = self.env.reset()  #
        self.rewards = []  # a list of rewards
        self.steps = []  # a sequence of chosen steps
        # self.poilcy = []  # list of policy-defined actions at each state (Note that "policy" appears misspelled here, look out for the invocation)
        self.policy = []
        
    def select_action(self, state, action):  # returns policy defined action (a') for each state and action pair (s,a). 
        return action

    def play_episode(self, env):  # contains the while loop that drives the agent through the environment
        self.env = env  # overwrite the __init__ cartpole default environment
        total_reward = 0.0
        state = env.reset()
        action = random.choice([0,1])  # 
        steps = 0
        actions = []
        isInit = 1
        while True:
            if isInit:  # set to '1' above
                state = state[0]  # start at the first state
                isInit = 0  # considered initialized after the first step
            action = self.select_action(state, action)
            new_state, reward, is_done, _, _ = env.step(action)  # collect new_state, reward, is_done status
            total_reward += reward  # 
            steps +=1
            actions.append(action)  # record the action taken
            if is_done:  # 
                break
            state = new_state  # updates Agent's self.state member value
        return total_reward, steps, actions  # return the final gameplay outcome
    
    def repeat(self,n_sample):
        rewards =[]  # a new set of 'rewards', 'steps', and 'policy' lists is generated for every single round of gameplay 
        steps=[]
        policy = []
        for i in range(n_sample):  # forms an effective outer loop by iterating through 'n_sample' runs of agent sgameplay
            reward, step, actions = self.play_episode(self.env)  # drive the game_play using the member function above
            rewards.append(reward)  # appending each item to the list(s)
            steps.append(step)
            policy.append(actions)
        self.rewards = rewards  # update the agent's rewards list
        self.steps = steps
        # self.policy = np.array(policy)  # commented out in the original example, turns policy list into an np.array
        self.policy = policy
        

# WE CAN DEFINE DIFFERENT AGENT TYPES AS SHOWN BELOW
class ClassOne(AtariAssaultAgent):  # the user-defined class ReflexAgent, extends Agent
    def select_action(self, state, action):  
        if state[1]*state[3]<0:
            action = int(1-action)  
        return action    



In [None]:
# # begin Python q-learning tutorial: 
# # the goal is to compose a Q-Table, a memoization table
# import gym
# import numpy as np
# import matplotlib.pyplot as plt

# env = gym.make("MountainCar-v0")  # q-learners should perform in other environments also
# env.reset()
# # print(env.observation_space.high)
# # print(env.observation_space.low)
# # print(env.action_space.n)

# num_wins = 0  # win counter

# # define the q-learning parameters (ADJUST THESE)
# LEARNING_RATE = 0.15  # 0.1
# DISCOUNT = 0.95  # reducing this value increases sensitivity to distant rewards
# EPISODES = 10000  # 
# SHOW_EVERY = 1000  # 

# DISCRETE_OBS_SIZE = [8] * len(env.observation_space.high)  # number of discrete states in env
# discrete_os_win_size = (env.observation_space.high - env.observation_space.low) / DISCRETE_OBS_SIZE  # normalization value that we can use
# # print(discrete_os_win_size)

# # define the epsilon control parameter
# epsilon = 0.5  # controls agent stochasticity, decreases by 'epsilon_decay_value' for the first 'END_EPSILON_DECAYING' episodes
# START_EPSILON_DECAYING = 1
# DIV_FACTOR = 30  # try 2,3,4,...  larger value == larger delta_epsilon "chunks"
# END_EPSILON_DECAYING = EPISODES // DIV_FACTOR  # larger divisor => larger delta_epsilon => more exploitation
# epsilon_decay_value = epsilon/(END_EPSILON_DECAYING - START_EPSILON_DECAYING)  # normalized epsilon decay value

# q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OBS_SIZE + [env.action_space.n]))  # Q-table with states in columns and (state,action) cominations in rows
# # print(q_table.shape)
# # print(q_table)  # q-table enumerates every possible observation combination

# ep_rewards = []  # episode rewards
# aggr_ep_rewards = {'ep': [], 'avg': [], 'min': [], 'max': []}  # 'avg' will contain a pooled average


# def get_discrete_state(state):
#     discrete_state = (state - env.observation_space.low) / discrete_os_win_size  # normalized state value
#     return tuple(discrete_state.astype(np.int64))  # returns as a tuple that will eventually comprise the (state,action) pair
# # print(discrete_state)
# # print("initialized q-values:", q_table[discrete_state])  # prints the q-table associated with 'discrete_state'
# # print(np.argmax(q_table[discrete_state]))  # collects the max value among the q-table values in the initialized state


# for episode in range(EPISODES):
#     episode_reward = 0  # keep track of sequences of events that scored higher
#     if episode % SHOW_EVERY == 0:  # give update every SHOW_EVERY episodes
#         print(episode)
#         render = True
#     else: 
#         render = False
        
#     discrete_state = get_discrete_state(env.reset())  # collect the 1st discrete state for the new board
#     done = False
#     while not done:
#         # include eploration/exploitation trade-off here
#         if np.random.random() <= epsilon:  # if epsilon is larger than the random value
#             action = np.random.randint(0, env.action_space.n)  # explore
#         else: 
#             action = np.argmax(q_table[discrete_state])  # exploit
            
#         new_state, reward, done, _ = env.step(action)  # use it to generate 'new_state', 'reward', and 'done'
#         episode_reward += reward  # 
#         new_discrete_state = get_discrete_state(new_state)  # normalize (AKA 'descretize') the continuous new_state value 
#         if render:
#             env.render()
#             # print(reward, new_state)
#         if not done: 
#             max_future_q = np.max(q_table[new_discrete_state])
#             current_q = q_table[discrete_state + (action, )]  # 
#             new_q = ((1-LEARNING_RATE) * current_q) + (LEARNING_RATE * (reward + (DISCOUNT * max_future_q)))  # 
#             q_table[discrete_state + (action, )] = new_q  # update q value for 'discrete_state' in the q-table
#         elif new_state[0] >= env.goal_position:
#             print(f"agent made it to the flag on episode: {episode}")
#             num_wins += 1
#             q_table[discrete_state + (action, )] = 0  # assign a "reward" of 0 (recall that cost of living is -1)
    
#         discrete_state = new_discrete_state  # 
    
#     if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING:
#         epsilon -= epsilon_decay_value  # update epsilon
        
#     ep_rewards.append(episode_reward)  # append the episode rewards to the table
#     if episode % SHOW_EVERY == 0:
#         average_reward = sum(ep_rewards[-SHOW_EVERY:])/len(ep_rewards[-SHOW_EVERY:])
#         aggr_ep_rewards['ep'].append(episode)
#         aggr_ep_rewards['avg'].append(average_reward)
#         aggr_ep_rewards['min'].append(min(ep_rewards[-SHOW_EVERY:]))
#         aggr_ep_rewards['max'].append(max(ep_rewards[-SHOW_EVERY:]))
        
#         print(f"Episode: {episode}, avg: {average_reward}, min: {min(ep_rewards[-SHOW_EVERY:])}, max: {max(ep_rewards[-SHOW_EVERY:])}")
        
# print(f"Agent won {num_wins} out of {EPISODES} games")
# print("epsilon decay value used: ", epsilon_decay_value, "division factor: ", DIV_FACTOR)
# env.close()

# plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['avg'], label="average rewards")
# plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['min'], label="minimum rewards")
# plt.plot(aggr_ep_rewards['ep'], aggr_ep_rewards['max'], label="maximum rewards")
# plt.legend(loc=4)
# plt.show()

In [None]:
# source: https://www.kaggle.com/code/yaaryan/space-invaders-game-using-deep-q-networks
# !pip install tensorflow==1.14
import tensorflow as tf
tf.__version__

import numpy as np
import gym
from tensorflow.contrib.layers import flatten, conv2d, fully_connected
from collections import deque, Counter
import random
import datetime


In [None]:
############## SAVE THIS CODE ##################
# THIS CODE SECTION SHOWS ACTIVE PYTHON ENV PATH (Local Windows Env.):
# import os 
# import sys
# os.path.dirname(sys.executable)