Sources: https://medium.com/analytics-vidhya/q-learning-is-the-most-basic-form-of-reinforcement-learning-which-doesnt-take-advantage-of-any-8944e02570c5
         https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

In [119]:
import numpy as np
import gymnasium as gym
env = gym.make('CartPole-v1')

import matplotlib.pyplot as plt
import plotly.express as px

from sklearn.preprocessing import PolynomialFeatures

from collections import namedtuple, deque
import random

from plotly.subplots import make_subplots
import plotly.graph_objects as go

import json

# Cart Pole Problem

In [None]:
# Action Space : Discrete(2)
# Observation Shape : (4,)
# Observation High : [4.8 inf 0.42 inf]
# Observation Low : [-4.8 -inf -0.42 -inf]


In [None]:
# Action Space: 0 - go left, 1  - go right

# Observation[0]: Cart Position in [-4.8, 4.8]
# Observation[1]: Cart Velocity in [-inf, inf]
# Observation[2]: Pole Angle in [-0.42, 0.42]
# Observation[3]: Pole Angular Velocity [-inf, inf]

## Linear Function Approximation

In [140]:
class QLinearModel:
    """Q function approximator using linear model."""
    
    def __init__(self, n_features, n_actions, feature_transformer):
        """Initialize the model."""
        
        self.weights = np.random.uniform(-1e-3,1e-3,[n_features, n_actions])
        self.feature_transformer = feature_transformer
        
    def approx(self, obs, action):
        """Approximate the Q value for a given state and action."""
        
        return self.weights[:, action].dot(self.feature_transformer.transform(obs))
    
    def update(self, delta, action):
        """Update the weights of the model."""
        
        self.weights[:, action] = self.weights[:, action] + delta
        
    def max_Q(self, obs):
        """Return the action with the highest Q value."""
        
        q_values = self.weights.T.dot(self.feature_transformer.transform(obs))
        return np.argmax(q_values), np.max(q_values)

## Feature Transformers

In [141]:
class AddInterceptFeatures:
    """Add an intercept."""

    def __init__(self):
        pass

    def transform(self, obs):
        """Add an intercept term to an observation."""
        return np.concatenate([obs, np.ones((1,))])

In [142]:
class RBFFeatures:
    """Radial basis function features."""

    def __init__(self, n_features, centers, widths):
        """Initialize the model."""

        self.n_features = n_features
        self.centers = centers
        self.widths = widths

    def train(self, X):
        """Obtain the centers and widths of the RBFs."""
        
        self.centers = np.mean(X, axis=0)
        self.widths = np.std(X, axis=0)

    def transform(self, obs):
        """Transform an observation into RBF features."""

        return np.exp(-0.5 * np.sum((obs.reshape(-1,1) - self.centers)**2 / self.widths**2, axis=1))

## Experience Replay

In [457]:
Transition = namedtuple('Transition',
                        ('observation', 'action', 'next_observation', 'reward'))

class ReplayMemory(object):
    """Experience replay memory."""

    def __init__(self, capacity):
        """Initialize the memory."""

        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""

        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        """Sample a batch of transitions."""
        
        return random.sample(self.memory, batch_size)


    def __len__(self):
        """Return the length of the memory."""
        
        return len(self.memory)

## Non-sparse reward function

In [465]:
def reward_func(obs):
    """Non-sparse reward function for the cartpole problem."""

    return 1 - obs[2]**2

## Q-learning

In [152]:
class QLearning:
    """Q-learning algorithm."""

    def __init__(self, n_features, n_actions, feature_transformer):
        """Initialize the algorithm."""

        # self.env = env
        self.feature_transformer = feature_transformer
        self.n_features = n_features
        self.n_actions = n_actions


    def train(self, reward_func = None, episodes=5000, penalty=0.95, learning_rate=0.001, timestep=100, epsilon_decay=0.999, regul_strength=1e-4):
        """Train the Q-function approximator."""

        # set parameters
        epsilon = 1     # exploration rate
        lmbda = regul_strength    # l2 regularisation


        # initialise the Q function approximation model
        self.qlm = QLinearModel(n_features=self.n_features, n_actions=self.n_actions, feature_transformer=self.feature_transformer)


        # reset counters
        rewards = 0
        steps = 0 
        self.runs = [0]
        self.avgs = [0]

        # solved flag
        solved = False

        # initialise diagnostics
        self.diagnostics = {'actions': [], 'rewards': [], 'max_Q_for_new': [], 'current_Q': [], 'TD_error': [], 'delta_weights': []}


        for episode in range(1, episodes+1):

            # decay epsilon
            epsilon = epsilon * epsilon_decay
            
            # initialise the environment and obtain the observation
            current_observation = env.reset()[0] 
            
            # reset score and terminated flag
            score = 0
            terminated = False
            
            while not terminated:
                steps += 1
                
        #         if episode % timestep == 0:
        #             env.render()
                
                # epsilon-greedy action selection
                if np.random.uniform(0,1) < epsilon:
                    action = env.action_space.sample() # pick a random action
                else:
                    action = self.qlm.max_Q(current_observation)[0] # pick the best action for a given state
                
                # perform action and obtain reward and a new observation
                next_observation, reward, terminated, truncated, info = env.step(action)

                # augment reward
                if reward_func is not None:
                    reward = reward_func(next_observation)

                score += reward

                # store transition in replay memory
                memory.push(current_observation, action, next_observation, reward)

                if not terminated:
                    
                    # calculate TD error
                    max_Q_for_new_obs = self.qlm.max_Q(next_observation)[1]
                    current_Q = self.qlm.approx(current_observation, action)
                    delta_weights = learning_rate \
                                    * (reward + penalty * max_Q_for_new_obs - current_Q) \
                                    * self.qlm.feature_transformer.transform(current_observation)
                    
                    # add l2 regularisation to delta_weights
                    delta_weights = delta_weights - lmbda * self.qlm.weights[:, action]

                    # update weights
                    self.qlm.update(delta_weights, action)

                    # append diagnostics
                    self.diagnostics['actions'].append(action)
                    self.diagnostics['rewards'].append(reward)
                    self.diagnostics['max_Q_for_new'].append(max_Q_for_new_obs)
                    self.diagnostics['current_Q'].append(current_Q)
                    self.diagnostics['TD_error'].append(reward + penalty * max_Q_for_new_obs - current_Q)
                    self.diagnostics['delta_weights'].append(delta_weights)
                    
                # update current observation
                current_observation = next_observation
                
            else:
                # append score
                rewards += score
                self.runs.append(score)
                
                # check if solved
                if score > 195 and steps >= 100 and solved == False:
                    solved = True
                    print('Solved')
                    
            if episode%timestep == 0:
                    self.avgs.append(rewards/timestep)
                    rewards = 0
      
        env.close()

## Q-learning with Experience Replay (we did not observe convergence for this implementation)

full Q-learning with replay buffer (from CS285):
1. collect dataset $\left\{\left(\mathbf{s}_i, \mathbf{a}_i, \mathbf{s}_i^{\prime}, r_i\right)\right\}$ using some policy, add it to $\mathcal{B}$
Repeat K times:
- 2. sample a batch $\left(\mathbf{s}_i, \mathbf{a}_i, \mathbf{s}_i^{\prime}, r_i\right)$ from $\mathcal{B}$
- 3. $\phi \leftarrow \phi-\alpha \sum_i \frac{d Q_\phi}{d \phi}\left(\mathbf{s}_i, \mathbf{a}_i\right)\left(Q_\phi\left(\mathbf{s}_i, \mathbf{a}_i\right)-\left[r\left(\mathbf{s}_i, \mathbf{a}_i\right)+\gamma \max _{\mathbf{a}^{\prime}} Q_\phi\left(\mathbf{s}_i^{\prime}, \mathbf{a}_i^{\prime}\right)\right]\right)$

In [234]:
class QLearningWithExperienceReplay:
    """Q-learning algorithm with experience replay."""

    def __init__(self, n_features, n_actions, feature_transformer):
        """Initialize the algorithm."""

        # self.env = env
        self.feature_transformer = feature_transformer
        self.n_features = n_features
        self.n_actions = n_actions

         # initialise the Q function approximation model
        self.qlm = QLinearModel(n_features=self.n_features, n_actions=self.n_actions, feature_transformer=self.feature_transformer)


    def train(self, K, batch_size, learning_rate, reward_func=None, penalty=0.95):
        """Train the Q-function approximator using stochastic gradient descent with mini-batches."""

        # initialise the Q function approximation model
        self.qlm = QLinearModel(n_features=self.n_features, n_actions=self.n_actions, feature_transformer=self.feature_transformer)

        # peform K iterations of experience replay
        for _ in range(K):

            # sample a batch of transitions
            batch = memory.sample(batch_size)


            for i in range(batch_size):

                batch_delta_weights = {0: [], 1: []}

                # extract the transition
                current_observation, action, next_observation, reward = batch[i]

                # augment reward
                if reward_func is not None:
                    reward = reward_func(next_observation)

                # calculate TD error
                max_Q_for_new_obs = self.qlm.max_Q(next_observation)[1]
                current_Q = self.qlm.approx(current_observation, action)
                delta_weights = (reward + penalty * max_Q_for_new_obs - current_Q) \
                                * self.qlm.feature_transformer.transform(current_observation)

                batch_delta_weights[action].append(delta_weights)

            # average the delta weights
            mean_delta_weights = {0: np.mean(batch_delta_weights[0], axis=0) if len(batch_delta_weights[0])>0 else 0, 
                                  1: np.mean(batch_delta_weights[1], axis=0) if len(batch_delta_weights[1])>0 else 0}

            # update weights
            self.qlm.update(learning_rate * mean_delta_weights[0], 0)
            self.qlm.update(learning_rate * mean_delta_weights[1], 1)

    def run(self, reward_func=None, episodes=100, epsilon=0):
        """Run the algorithm for a given number of episodes."""

        # reset counters
        rewards = 0
        steps = 0
        runs = []

        # initialise diagnostics
        self.diagnostics = {'actions': [], 'rewards': [], 'max_Q_for_new': [], 'current_Q': [], 'TD_error': [], 'delta_weights': []}

        # solved flag
        solved = False

        for episode in range(1, episodes+1):
            
            # initialise the environment and obtain the observation
            current_observation = env.reset()[0] 
            
            score = 0
            terminated = False

            # epsilon = epsilon * epsilon_decay
            
            while not terminated:
                steps += 1
                
                # epsilon-greedy policy
                if np.random.uniform(0,1) < epsilon:
                    action = env.action_space.sample() # pick a random action
                else:
                    action = self.qlm.max_Q(current_observation)[0] # pick the best action for a given state
                    
                # perform action and obtain reward and a new observation
                next_observation, reward, terminated, truncated, info = env.step(action) 
                score += reward

                # augment reward
                if reward_func is not None:
                    reward = reward_func(next_observation)


        
                # store transition in replay memory
                memory.push(current_observation, action, next_observation, reward)
                    
                current_observation = next_observation
                
            else:
                # append score
                rewards += score
                runs.append(score)
                
                # check if solved
                if score > 195 and steps >= 100 and solved == False: 
                    solved = True
                    print('Solved')
        
                           
        env.close()
        return runs


## Q-learning with experience replay (Combined Q)

Following Zhang S, Sutton R. A Deeper Look at Experience Replay [Internet]. Available from: https://arxiv.org/pdf/1712.01275.pdf

In [479]:
class CombinedQ:

    def __init__(self, n_features, n_actions, feature_transformer):
        """Initialize the algorithm."""

        # self.env = env
        self.feature_transformer = feature_transformer
        self.n_features = n_features
        self.n_actions = n_actions

        # initialise diagnostics
        self.diagnostics = {'actions': [], 'rewards': [], 'max_Q_for_new': [], 'current_Q': [], 'TD_error': [], 'delta_weights': []}

        # initialise the Q function approximation model
        self.qlm = QLinearModel(n_features=self.n_features, n_actions=self.n_actions, feature_transformer=self.feature_transformer)


    def train(self, K, batch_size, learning_rate, reward_func=None, episodes=100, epsilon_decay=0.999, penalty=0.95):
        """Run the algorithm for a given number of episodes."""

        # reset counters
        rewards = 0
        steps = 0
        self.runs = []

        epsilon = 1



        # solved flag
        solved = False

        for episode in range(1, episodes+1):
            # print(episode)
            
            # initialise the environment and obtain the observation
            current_observation = env.reset()[0] 
            
            score = 0
            terminated = False

            epsilon = epsilon * epsilon_decay
            
            while not terminated:
                steps += 1
                
                # epsilon-greedy policy
                if np.random.uniform(0,1) < epsilon:
                    action = env.action_space.sample() # pick a random action
                else:
                    action = self.qlm.max_Q(current_observation)[0] # pick the best action for a given state
                    
                # perform action and obtain reward and a new observation
                next_observation, reward, terminated, truncated, info = env.step(action) 
                
                # augment reward
                if reward_func is not None:
                    reward = reward_func(next_observation)

                score += reward

                # create transition
                transition_t = Transition(current_observation, action, next_observation, reward)

                # store transition in replay memory
                memory.push(current_observation, action, next_observation, reward)

                if not terminated:
                    # sample a batch of transitions from the replay memory and update the Q function approximation model

                    # peform K iterations of experience replay
                    for _ in range(K):

                        # sample a batch of transitions
                        batch = memory.sample(batch_size)
                        batch.append(transition_t)


                        for i in range(len(batch)):

                            # extract the transition
                            sample_current_observation, sample_action, sample_next_observation, sample_reward = batch[i]

                            # augment reward
                            if reward_func is not None:
                                reward = reward_func(sample_next_observation)

                            # calculate TD error
                            max_Q_for_new_obs = self.qlm.max_Q(sample_next_observation)[1]
                            current_Q = self.qlm.approx(sample_current_observation, sample_action)
                            delta_weights = learning_rate * (reward + penalty * max_Q_for_new_obs - current_Q) \
                                            * self.qlm.feature_transformer.transform(sample_current_observation)

                            delta_weights = delta_weights - 1e-3 * self.qlm.weights[:, sample_action]

                            self.qlm.update(delta_weights, sample_action)

                             # append diagnostics
                            self.diagnostics['actions'].append(sample_action)
                            self.diagnostics['rewards'].append(sample_reward)
                            self.diagnostics['max_Q_for_new'].append(max_Q_for_new_obs)
                            self.diagnostics['current_Q'].append(current_Q)
                            self.diagnostics['TD_error'].append(sample_reward + penalty * max_Q_for_new_obs - current_Q)
                            self.diagnostics['delta_weights'].append(delta_weights)

                # S <- S'
                current_observation = next_observation
                
            else:
                # append score
                rewards += score
                self.runs.append(score)
                
                # check if solved
                if score > 195 and steps >= 100 and solved == False: 
                    solved = True
                    print('Solved')
        
                        
        env.close()
    