In [1]:
import gym
from random import choices
from gym import spaces
import numpy as np
import itertools

Documentation on Github: https://github.com/openai/gym/blob/master/gym/envs/classic_control/mountain_car.py

In [2]:
env = gym.make("CartPole-v0")

In [3]:
#env.reset()
#for _ in range(1000):
#    env.render()
#    env.step(env.action_space.sample()) # take a random action
#env.close()

In [4]:
class agent:
    
    """
    Includes the methods for action-selection as well as the three RL Algorithms:
    REINFORCE
    REINFORCE with baseline
    one-step Actor-critic
    unfortunately there seem to be some problems with convergence
    """
    
    def __init__(self,environment):
        self.env = gym.make(environment)
        self.env.reset()
        
    def select_action(self,theta):
        action_list = self.env.action_space
        weights = [policy(self.env.state, action, action_list, theta) for action in get_space_list(action_list)]
        return choices([action for action in get_space_list(action_list)],weights = weights)[0]
        
    def Reinforce(self, alpha, gamma, nr_episodes):
        theta = np.zeros(len(feature_vector(self.env.state,self.env.action_space.sample())))
        q_arr = []
        sum_rewards = []
        
        for k in range(nr_episodes):
            q = 0
            self.env.reset()
            S = [self.env.state]
            a = self.select_action(theta)
            A = [a]
            R = []
            new_state, reward, done, info = self.env.step(a)
            S.append(new_state)
            R.append(reward)
            
            while not done:
                q +=1
                a = self.select_action(theta)
                A.append(a)
                new_state, reward, done, info = self.env.step(a)
                S.append(new_state)
                R.append(reward)
            
            sum_rewards.append(sum(R))
            q_arr.append(q)
            
            for t in range(q):
                G = sum([gamma**(k-t)*R[k] for k in range(t,q)])
                
                eligibility_vector = (
                    feature_vector(S[t],A[t])-
                    sum([policy(S[t],b,self.env.action_space,theta)*feature_vector(S[t],b) for b in get_space_list(self.env.action_space)],0)
                )
                theta += alpha*(gamma**t)*G*eligibility_vector
        
        print('average reward = {}'.format(np.average(sum_rewards)))
        return theta, sum_rewards
    
    def Reinforce_baseline(self, alpha_w, alpha_t, gamma, nr_episodes):
        theta = np.zeros(len(feature_vector(self.env.state,self.env.action_space.sample())))
        w = np.zeros(len(feature_vector(self.env.state)))
        q_arr = []
        sum_rewards = []
        
        for k in range(nr_episodes):
            q = 0
            self.env.reset()
            S = [self.env.state]
            a = self.select_action(theta)
            A = [a]
            R = []
            new_state, reward, done, info = self.env.step(a)
            S.append(new_state)
            R.append(reward)
            
            while not done:
                q +=1
                a = self.select_action(theta)
                A.append(a)
                new_state, reward, done, info = self.env.step(a)
                S.append(new_state)
                R.append(reward)
            
            sum_rewards.append(sum(R))
            q_arr.append(q)
            
            for t in range(q):
                G = sum([gamma**(k-t)*R[k] for k in range(t,q)])
                delta = G - w@feature_vector(S[t])
                eligibility_vector = (
                    feature_vector(S[t],A[t])-
                    sum([policy(S[t],b,self.env.action_space,theta)*feature_vector(S[t],b) for b in get_space_list(self.env.action_space)],0)
                )
                
                w += alpha_w*delta*feature_vector(S[t])
                theta += alpha_t*delta*(gamma**t)*G*eligibility_vector
        
        print('average reward = {}'.format(np.average(sum_rewards)))
        return theta, sum_rewards
    
    def actor_critic(self, alpha_w, alpha_t, gamma, nr_episodes):
        theta = np.zeros(len(feature_vector(self.env.state,self.env.action_space.sample())))
        w = np.zeros(len(feature_vector(self.env.state)))
        sum_rewards = []
        q_arr = []
        
        for p in range(nr_episodes):
            self.env.reset()
            old_state = self.env.state
            I = 1
            q = 0
            R = []
            done = False
            
            while not done:
                a = self.select_action(theta)
                new_state, reward, done, info = self.env.step(a)
                q += 1
                R.append(reward)
                
                if done:
                    v_prime = 0
                else:
                    v_prime = w@feature_vector(new_state)
                
                eligibility_vector = (
                    feature_vector(old_state,a)-
                    sum([policy(old_state,b,self.env.action_space,theta)*feature_vector(old_state,b) for b in get_space_list(self.env.action_space)],0)
                )
                
                delta = reward + gamma*v_prime - gamma*w@feature_vector(old_state)
                w += alpha_w*delta*feature_vector(old_state)
                theta += alpha_t*I*delta*eligibility_vector
                I = I*gamma
                old_state = new_state
            
            q_arr.append(q)
            sum_rewards.append(sum(R))
        
        print('average reward = {}'.format(np.average(sum_rewards)))
        
        return theta, sum_rewards

#exponential soft-max distribution
def policy(state,action,action_space,theta):
    denom = sum([np.exp(theta@feature_vector(state,a)) for a in get_space_list(action_space)])
    return np.exp(theta@feature_vector(state,action))/denom   

#polynomial feature vectors for both states and state-action pairs, standard degree 2
def feature_vector(state, action = None, n = 2):
    s = state
    
    if action == None:
        c = np.array(list(itertools.product(range(n), repeat = len(s))))
        
        return np.array(
            [np.prod(
                np.array([s[i] for i in range(len(s))])** c_) for c_ in c])
    else:
        c = np.array(list(itertools.product(range(n), repeat = len(s) + 1)))

        return np.array(
            [np.prod(
                np.array(
                    np.append([s[i] for i in range(len(s))],action))
                ** c_) for c_ in c])

In [5]:
def get_space_list(space):

    """
    Converts gym space, constructed from types, to list space_list
    """

    # -------------------------------- #

    types = [
        gym.spaces.multi_binary.MultiBinary,
        gym.spaces.discrete.Discrete,
        gym.spaces.multi_discrete.MultiDiscrete,
        gym.spaces.dict.Dict,
        gym.spaces.tuple.Tuple,
    ]

    if type(space) not in types:
        raise ValueError(f'input space {space} is not construdted from spaces of types:' + '\n' + str(types))

    # -------------------------------- #

    if type(space) is gym.spaces.multi_binary.MultiBinary:
        return [
            np.reshape(np.array(element), space.n)
            for element in itertools.product(
                *[range(2)] * np.prod(space.n)
            )
        ]

    if type(space) is gym.spaces.discrete.Discrete:
        return list(range(space.n))

    if type(space) is gym.spaces.multi_discrete.MultiDiscrete:
        return [
            np.array(element) for element in itertools.product(
                *[range(n) for n in space.nvec]
            )
        ]

    if type(space) is gym.spaces.dict.Dict:

        keys = space.spaces.keys()
        
        values_list = itertools.product(
            *[get_space_list(sub_space) for sub_space in space.spaces.values()]
        )

        return [
            {key: value for key, value in zip(keys, values)}
            for values in values_list
        ]

        return space_list

    if type(space) is gym.spaces.tuple.Tuple:
        return [
            list(element) for element in itertools.product(
                *[get_space_list(sub_space) for sub_space in space.spaces]
            )
        ]

    # -------------------------------- #

In [6]:
ag = agent("CartPole-v0")

In [7]:
theta_out, rewards = ag.Reinforce(0.1,0.9,100)

average reward = 36.18


In [8]:
theta_2, rewards_2 = ag.Reinforce_baseline(0.1,0.1,0.9,100)

average reward = 51.26


In [9]:
theta_3, rewards_3 = ag.actor_critic(0.1,0.1,0.9,100)

average reward = 46.15
