# COMP 579 - ASSIGNMENT 3
[Ling Fei Zhang](https://github.com/Ling01234), 260985358

Sevag Baghdassarian, ID

Brandon Ma, ID

In [1]:
# imports
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt 
import gymnasium as gym
from tqdm import tqdm, trange
import random
import time
import matplotlib.colors as mcolors
from scipy.stats import sem
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.neural_network import MLPRegressor


  from .autonotebook import tqdm as notebook_tqdm


# Q-Learning Agent

In [2]:
# Actions:
# 0: left
# 1: right

# best params initialization:
ALPHA = 1/4
EPSILON = 0.25
GAMMA = 0.95
BINS = 10
EPISODES = 1000
RUNS = 10
SEED = 123
random.seed(SEED)


class Qlearning:
    def __init__(self, env, alpha, gamma, epsilon, num_episodes, num_bins, seed) -> None:
        self.env = env
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.num_episodes = num_episodes
        self.num_bins = num_bins
        self.lowerbound = env.observation_space.low
        self.lowerbound[1] = -3.5
        self.lowerbound[3] = -10
        self.upperbound = env.observation_space.high
        self.upperbound[1] = 3.5
        self.upperbound[3] = 10
        # self.env.seed(seed)
        self.seed = seed
        random.seed(self.seed)
        self.num_action = env.action_space.n
        self.reward = []
        self.Qvalues = np.random.uniform(low=-0.001, high=0.001,
                                         size=(num_bins, num_bins, num_bins, num_bins, self.num_action))
        self.behavior_episodes = []
        self.random_episodes = []
        self.bins = []
        for i in range(4):
            self.bins.append(np.linspace(
                self.lowerbound[i], self.upperbound[i], self.num_bins))
            
    # def get_behavior_episodes(self):
    #     return np.apply_along_axis(np.argmax, 4, self.Qvalues) #shape (10, 10, 10, 10)
        

    def discritize_state(self, state):
        """
        Discritize continuous state into a discrete state

        Args:
            state (list of length 4): Current continuous state of agent

        Returns:
            state (4-tuple): Current discritized state of agent
        """
        new_state = []
        for i in range(4):
            index = np.maximum(np.digitize(state[i], self.bins[i]) - 1, 0)
            new_state.append(index)

        return tuple(new_state)

    def select_action(self, state, episode):
        """
        Select action given a state

        Args:
            state (4-tuple): Current state of the agent, continuous
            episode (int): Current episode of the run

        Returns:
            int: Action chosen by the agent
        """
        random.seed(self.seed)

        # lower exploration rate as we run many episodes
        if episode > 700:
            self.epsilon *= 0.99

        # epsilon greedy
        number = np.random.random()
        if number < self.epsilon:  # uniformly choose action
            return np.random.choice(self.num_action)

        # greedy selection
        state = self.discritize_state(state)
        best_actions = np.where(
            self.Qvalues[state] == np.max(self.Qvalues[state]))[0]
        return np.random.choice(best_actions)

    def simulate_episodes(self):
        """
        Simulate a specified number of episodes
        """
        for episode in range(1, self.num_episodes+1):
            # reset env
            (state, _) = self.env.reset()
            state = list(state)

            # run episode
            episode_reward = 0
            terminal = False
            while not terminal:
                discritized_state = self.discritize_state(state)
                action = self.select_action(state, episode)
                (next_state, reward, terminal, _, _) = self.env.step(action)
                episode_reward += reward

                next_discritized_state = self.discritize_state(
                    list(next_state))

                q_max = np.max(self.Qvalues[next_discritized_state])
                self.qlearning_update(
                    terminal, reward, action, discritized_state, q_max)

                state = next_state

            self.reward.append(int(episode_reward))

    def qlearning_update(self, terminal, reward, action, state, q_max):
        """
        Qlearning update rule

        Args:
            terminal (bool): True if at terminal state, False otherwise
            reward (int): Reward of the agent at current state
            action (int): Action taken by agent
            state (4-tuple): Discrete state of the agent
            q_max (float): Max Q value of the next state
        """
        if not terminal:
            loss = reward + self.gamma * q_max - \
                self.Qvalues[state + (action,)]
        else:
            loss = reward - self.Qvalues[state + (action,)]

        self.Qvalues[state + (action,)] += self.alpha * loss

    def visualize(self, games):
        """
        Visualize the game played for a specified number of games.
        Prints out the reward for each game.

        Args:
            games (int): Number of games to be played
        """
        random.seed(self.seed)
        env = gym.make("CartPole-v1", render_mode="human")
        for game in range(games):
            (state, _) = env.reset()
            env.render()
            rewards = 0

            for _ in range(500):
                discritized_state = self.discritize_state(state)
                best_actions = np.where(self.Qvalues[discritized_state] == np.max(
                    self.Qvalues[discritized_state]))[0]
                action = np.random.choice(best_actions)
                (state, reward, terminal, _, _) = env.step(action)
                rewards += int(reward)
                time.sleep(0.05)

                if terminal:
                    time.sleep(1)
                    break
            print(f"reward for game {game}: {rewards}")
        env.close()
        
    def gather_episodes_agent(self, num_episodes):
        """
        Gather num_episodes behavior episodes for simple imitation learning

        Args:
            num_episodes (int): number of behavior episodes desired

        Returns:
            int: return of simple imitation learning using Q-Learning Agent
            as expert.
        """
        self.simulate_episodes()
        print(f"reward after simulate_episode: {self.reward[-20:]}")
        for episode in trange(1, num_episodes+1):
            # if episode % 10 == 0:
            #     print(f"gather episode {episode}")
            state, _ = self.env.reset()
            terminal = False
            while not terminal:
                discritized_state = self.discritize_state(state)
                best_actions = np.where(self.Qvalues[discritized_state] == np.max(
                    self.Qvalues[discritized_state]))[0]
                action = np.random.choice(best_actions)
                (next_state, reward, terminal, _, _) = self.env.step(action)
                self.behavior_episodes.append((state, action, reward, next_state, terminal))
                state = next_state
                
        # preprocess data
        x = np.array([data[0] for data in self.behavior_episodes])
        y = np.array([data[1] for data in self.behavior_episodes])
                
        episode_reward = simple_imitation(x, y)
        return episode_reward
        
        

    
    def gather_episodes_random(self, num_episodes):
        """
        Gather num_episodes behavior episodes with a random agent

        Args:
            num_episodes (int): number of behavior episodes desired for simple imitation

        Returns:
            int: return by a random agent. 
        """
        # print(f"reward after simulate_episode: {self.reward[-20:]}")
        for episode in trange(1, num_episodes+1):
            # if episode % 10 == 0:
            #     print(f"random agent episode {episode}")
            state, _ = self.env.reset()
            terminal = False
            while not terminal:
                discritized_state = self.discritize_state(state)
                action = self.env.action_space.sample()
                (next_state, reward, terminal, _, _) = self.env.step(action)
                self.random_episodes.append((state, action, reward, next_state, terminal))
                state = next_state
                
        # preprocess data
        x = np.array([data[0] for data in self.random_episodes])
        y = np.array([data[1] for data in self.random_episodes])
        
        episode_reward = simple_imitation(x, y)
        return episode_reward
        

def simple_imitation(x, y):
    env = gym.make("CartPole-v1")
    model = LogisticRegression()
    model.fit(x, y)
    
    state, _ = env.reset()
    episode_reward = 0
    terminal = False
    while not terminal:
        action = model.predict(state.reshape(1, -1))[0]
        state, reward, terminal, *_ = env.step(action)
        episode_reward += reward
    env.close()
    return episode_reward


Here, we will run a test run on the model, and see how it performs

In [3]:
def test_model():
    env = gym.make("CartPole-v1")
    qlearning = Qlearning(env, ALPHA, GAMMA, EPSILON, EPISODES, BINS, SEED)
    model_rewards = qlearning.gather_episodes_agent(500)
    random_rewards = qlearning.gather_episodes_random(500)
    print(f"Simple imitation with expert agent reward: {model_rewards}")
    print(f"Simple imitation with random agent reward: {random_rewards}")
    env.close()
    return qlearning.behavior_episodes, qlearning.random_episodes
    
behavior_episode, random_episodes = test_model()    

reward after simulate_episode: [197, 165, 172, 207, 197, 310, 241, 283, 222, 169, 269, 209, 215, 174, 280, 151, 149, 222, 183, 248]


100%|██████████| 500/500 [00:03<00:00, 157.87it/s]
100%|██████████| 500/500 [00:00<00:00, 1856.79it/s]

Simple imitation with expert agent reward: 401.0
Simple imitation with random agent reward: 36.0





We have pre-trained our Q-Learning agent with 1000 episodes, as we did in the previous assignment. We then use the trained Q-Learning agent as our expert in simple imitation learning and used logisitc regression to imitate the action observed in each state. The results above were produced using 500 behavior episodes. We can see that we can get decent results from simple imitation learning. On the other hand, we can see the returns received by the random agent. Without surprise, the returns are very low. 

# Datasets
Below, we will create the 9 datasets we need to perform our analysis. 

In [5]:
# expert Q-learning data
data1 = behavior_episode[:100]
x1 = np.array([data[0] for data in data1])
y1 = np.array([data[1] for data in data1])

data2 = behavior_episode[:250]
x2 = np.array([data[0] for data in data2])
y2 = np.array([data[1] for data in data2])

data3 = behavior_episode
x3 = np.array([data[0] for data in data3])
y3 = np.array([data[1] for data in data3])

# random agent data
data4 = random_episodes[:100]
x4 = np.array([data[0] for data in data4])
y4 = np.array([data[1] for data in data4])

data5 = random_episodes[:250]
x5 = np.array([data[0] for data in data5])
y5 = np.array([data[1] for data in data5])

data6 = random_episodes
x6 = np.array([data[0] for data in data6])
y6 = np.array([data[1] for data in data6])

# Shuffled data
shuffled_data = behavior_episode + random_episodes
random.shuffle(shuffled_data)

data7 = shuffled_data[:100]
x7 = np.array([data[0] for data in data7])
y7 = np.array([data[1] for data in data7])

data8 = shuffled_data[:250]
x8 = np.array([data[0] for data in data8])
y8 = np.array([data[1] for data in data8])

data9 = shuffled_data[:500]
x9 = np.array([data[0] for data in data9])
y9 = np.array([data[1] for data in data9])


# Fitted Q-Learning

In [39]:
class FittedQLearning:
    def __init__(self, env, alpha, gamma, epsilon, num_episodes, batch_size, buffer_size, buffer, mlp) -> None:
        self.env = env
        self.alpha = alpha
        self.gamma = gamma
        self.epsilon = epsilon
        self.num_episodes = num_episodes
        self.batch_size = batch_size
        self.buffer_size = buffer_size
        self.mlp = mlp
        self.state_space = env.observation_space.shape[0]
        self.action_space = env.action_space.n
        self.buffer = buffer
        self.counter = 0
        
        if mlp:
            self.q_estimator = MLPRegressor(hidden_layer_sizes=(64,), activation="relu", solver="adam", max_iter=1000)
        else:
            self.q_estimator = LinearRegression()
        
        # fit function approximator with buffer data
        x = np.array([data[0] for data in buffer])
        print(f"x[0]: {x[0]}")
        y = np.array([data[1] for data in buffer])
        self.q_estimator.fit(x,y)
        
            
    def select_action(self, state):
        # predict(state.reshape(1, -1))[0]
        # print(f"select_action: {self.counter}")
        qvalues = self.q_estimator.predict(np.array([state]).reshape(1, -1))
        action = np.argmax(qvalues)
        self.counter += 1
        return action
    
    def update(self):
        if len(self.buffer) > self.batch_size:
            # sample a batch
            # print(f"buffer len: {len(self.buffer)}")
            batch_index = np.random.choice(len(self.buffer), size = self.batch_size)
            # print(f"batch index: {batch_index}")
            batch = [self.buffer[i] for i in batch_index]
            # print(f"batch: {batch}")
            states, actions, rewards, next_states, terminals = zip(*batch) # transforms into a tuple of lists

            # compute targets
            # next_qvalues = []
            # for next_state in next_states:
            #     qvalues = self.q_estimator.predict(np.array([next_state]))
            #     next_qvalues.append(qvalues)
                
                
            # next_qvalues = np.max(next_qvalues)
            # print(f"next state {next_states[:2]}")
            next_qvalues = np.max(self.q_estimator.predict(np.array(next_states)))
            targets = rewards + self.gamma * (1 - np.array(terminals)) * next_qvalues 
            
            # update qvalues using least squares regressions
            # x = np.zeros((self.batch_size, self.state_space + self.action_space))
            x = np.zeros((self.batch_size, self.state_space + self.action_space))
            # print(f"here x: {x[0]}")
            for i in range(self.batch_size):
                x[i][:self.state_space] = states[i]
                x[i][self.state_space + actions[i]] = 1
            y = targets.reshape(-1,1)
            
            # update q_estimator from least squares
            self.q_estimator.fit(x,y)
            
    def train(self):
        for episode in trange(self.num_episodes):
            state, _ = self.env.reset()
            terminal = False
            total_reward = 0
            
            while not terminal:
                action = self.select_action(state)
                next_state, reward, terminal, *_ = self.env.step(action)
                total_reward += reward
                
                # add experience to buffer
                self.buffer.append((state, action, reward, next_state, terminal))
                
                # check buffer max size
                if len(self.buffer) > self.buffer_size:
                    self.buffer.pop(0) 
                    
                self.update()
                state = next_state
            
            # print(f"Train episode {episode} with reward {total_reward}")
            if (episode % 10 == 0):
                print(f"Train episode {episode} with reward {total_reward}")
                
    def test(self):
        state, _ = self.env.reset()
        episode_reward = 0
        terminal = False
        while not terminal:
            action = self.q_estimator.predict(np.array([state]).reshape(1, -1))[0]
            print(f"action in test {action}")
            state, reward, terminal, *_ = self.env.step(action)
            episode_reward += reward
        return episode_reward
        
                    
def test_model_fitted():
    env = gym.make("CartPole-v1")
    fitted = FittedQLearning(env, ALPHA, GAMMA, EPSILON, 500, 64, 100000, behavior_episode, False)
    fitted.train()
    reward = fitted.test()
    print(f"Fitted Q learning agent reward: {reward}")
    env.close()
    
test_model_fitted()
            

x[0]: [-0.38129348 -0.42148933  0.02191253  0.5119449 ]


  0%|          | 0/500 [00:00<?, ?it/s]


ValueError: X has 4 features, but LinearRegression is expecting 6 features as input.