In [1]:
import gym
env = gym.make('CartPole-v1')

In [2]:
env.observation_space

Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)

In [3]:
env.action_space

Discrete(2)

In [4]:
import numpy as np

class LogisticPolicy:

    def __init__(self, θ, α, γ):
        # Initialize paramters θ, learning rate α and discount factor γ

        self.θ = θ
        self.α = α
        self.γ = γ

    def logistic(self, y):
        # definition of logistic function

        return 1/(1 + np.exp(-y))

    def probs(self, x):
        # returns probabilities of two actions

        y = x @ self.θ
        prob0 = self.logistic(y)

        return np.array([prob0, 1-prob0])

    def act(self, x):
        # sample an action in proportion to probabilities

        probs = self.probs(x)
        action = np.random.choice([0, 1], p=probs)

        return action, probs[action]

    def grad_log_p(self, x):
        # calculate grad-log-probs

        y = x @ self.θ
        grad_log_p0 = x - x*self.logistic(y)
        grad_log_p1 = - x*self.logistic(y)

        return grad_log_p0, grad_log_p1

    def grad_log_p_dot_rewards(self, grad_log_p, actions, discounted_rewards):
        # dot grads with future rewards for each action in episode

        return grad_log_p.T @ discounted_rewards

    def discount_rewards(self, rewards):
        # calculate temporally adjusted, discounted rewards

        discounted_rewards = np.zeros(len(rewards))
        cumulative_rewards = 0
        for i in reversed(range(0, len(rewards))):
            cumulative_rewards = cumulative_rewards * self.γ + rewards[i]
            discounted_rewards[i] = cumulative_rewards

        return discounted_rewards

    def update(self, rewards, obs, actions):
        # calculate gradients for each action over all observations
        grad_log_p = np.array([self.grad_log_p(ob)[action] for ob,action in zip(obs,actions)])

        assert grad_log_p.shape == (len(obs), 4)

        # calculate temporaly adjusted, discounted rewards
        discounted_rewards = self.discount_rewards(rewards)

        # gradients times rewards
        dot = self.grad_log_p_dot_rewards(grad_log_p, actions, discounted_rewards)

        # gradient ascent on parameters
        self.θ += self.α*dot

In [5]:
def run_episode(env, policy, render=False):

    observation = env.reset()
    totalreward = 0

    observations = []
    actions = []
    rewards = []
    probs = []

    done = False

    while not done:
        if render:
            env.render()

        observations.append(observation)

        action, prob = policy.act(observation)
        observation, reward, done, info = env.step(action)

        totalreward += reward
        rewards.append(reward)
        actions.append(action)
        probs.append(prob)

    return totalreward, np.array(rewards), np.array(observations), np.array(actions), np.array(probs)

In [6]:
def train(θ, α, γ, Policy, MAX_EPISODES=1000, seed=None, evaluate=False):

    # initialize environment and policy
    env = gym.make('CartPole-v1')
    if seed is not None:
        env.seed(seed)
    episode_rewards = []
    policy = Policy(θ, α, γ)

    # train until MAX_EPISODES
    for i in range(MAX_EPISODES):

        # run a single episode
        total_reward, rewards, observations, actions, probs = run_episode(env, policy)

        # keep track of episode rewards
        episode_rewards.append(total_reward)

        # update policy
        policy.update(rewards, observations, actions)
        print("EP: " + str(i) + " Score: " + str(total_reward) + " ",end="\r", flush=False)

    # evaluation call after training is finished - evaluate last trained policy on 100 episodes
    if evaluate:
        env = RecordVideo(env, 'pg_cartpole/')
        for _ in range(100):
            run_episode(env, policy, render=False)
        env.env.close()

    return episode_rewards, policy

In [7]:
# additional imports for saving and loading a trained policy
from gym.wrappers import RecordVideo

# for reproducibility
GLOBAL_SEED = 0
np.random.seed(GLOBAL_SEED)

episode_rewards, policy = train(θ=np.random.rand(4),
                                α=0.002,
                                γ=0.99,
                                Policy=LogisticPolicy,
                                MAX_EPISODES=2000,
                                seed= None,
                                evaluate=True)

  y = x @ self.θ


ValueError: matmul: Input operand 1 has a mismatch in its core dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?) (size 4 is different from 2)

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
#from google.colab import files

plt.plot(episode_rewards);
plt.savefig('Cartpole_Main.png')
#files.download('Cartpole_Main.png')

In [None]:
np.var(episode_rewards)

In [None]:
import numpy as np

class LogisticPolicy:

    def __init__(self, θ, α, γ):
        # Initialize paramters θ, learning rate α and discount factor γ

        self.θ = θ
        self.α = α
        self.γ = γ

    def logistic(self, y):
        # definition of logistic function

        return 1/(1 + np.exp(-y))

    def probs(self, x):
        # returns probabilities of two actions

        y = x @ self.θ
        prob0 = self.logistic(y)

        return np.array([prob0, 1-prob0])

    def act(self, x):
        # sample an action in proportion to probabilities

        probs = self.probs(x)
        action = np.random.choice([0, 1], p=probs)

        return action, probs[action]

    def grad_log_p(self, x):
        # calculate grad-log-probs

        y = x @ self.θ
        grad_log_p0 = x - x*self.logistic(y)
        grad_log_p1 = - x*self.logistic(y)

        return grad_log_p0, grad_log_p1

    """def baseline(self, rewards):
        # calculate baseline
        baseline = np.mean(rewards)
        return baseline"""

    """def baseline(self, rewards):
        baseline = np.cumsum(rewards) / (np.arange(len(rewards)) + 1)
        return baseline"""

    """def td_error_baseline(self, rewards):
        baseline = np.zeros_like(rewards)
        for i in range(len(rewards) - 1, 0, -1):
            baseline[i] = baseline[i + 1] + rewards[i] - self.γ * rewards[i + 1]
        return baseline"""


    def td_error_baseline(self, rewards):
        baseline = np.zeros_like(rewards)
        bonus = 0.1 * rewards  # Simple bonus function

        for i in range(len(rewards) - 1, -1, -1):
            if i == len(rewards) - 1:
                baseline[i] = rewards[i] + bonus[i]  # Handle the last state separately
            else:
                baseline[i] = baseline[i + 1] + rewards[i] - self.γ * rewards[i + 1] + bonus[i]

        return baseline

    """def td_error_baseline(self, rewards):
        baseline = np.zeros_like(rewards)
        for i in range(len(rewards) - 1, -1, -1):
            if i == len(rewards) - 1:
                baseline[i] = rewards[i]  # Handle the last state separately
            else:
                baseline[i] = baseline[i + 1] + rewards[i] - self.γ * rewards[i + 1]

        return baseline"""

    """def monte_carlo_baseline(self, rewards, num_simulations = 3):
        baseline = np.zeros_like(rewards)

        for i in range(len(rewards)):
            simulated_rewards = []
            for _ in range(num_simulations):
                simulated_rewards.append(np.sum(rewards[i:]))

            baseline[i] = np.mean(simulated_rewards)

        return baseline"""


    """def running_average_baseline(self, rewards, alpha=0.9):
        baseline = np.zeros_like(rewards)
        for i in range(len(rewards)):
            baseline[i] = alpha * baseline[i - 1] + (1 - alpha) * rewards[i]
        return baseline"""



    def grad_log_p_dot_rewards(self, grad_log_p, actions, discounted_rewards):
        # dot grads with future rewards for each action in episode\
        #baseline = self.baseline(discounted_rewards)
        #discounted_rewards = discounted_rewards - baseline

        return grad_log_p.T @ discounted_rewards

    def discount_rewards(self, rewards):
        # calculate temporally adjusted, discounted rewards

        baseline = self.td_error_baseline(rewards)
        discounted_rewards = np.zeros(len(rewards))
        cumulative_rewards = 0
        for i in reversed(range(0, len(rewards))):
            cumulative_rewards = cumulative_rewards * self.γ + rewards[i]
            discounted_rewards[i] = cumulative_rewards - baseline[i]

        return discounted_rewards


    def update(self, rewards, obs, actions):
        # calculate gradients for each action over all observations
        grad_log_p = np.array([self.grad_log_p(ob)[action] for ob,action in zip(obs,actions)])

        assert grad_log_p.shape == (len(obs), 4)

        # calculate temporaly adjusted, discounted rewards
        discounted_rewards = self.discount_rewards(rewards)

        # gradients times rewards
        dot = self.grad_log_p_dot_rewards(grad_log_p, actions, discounted_rewards)

        # gradient ascent on parameters
        self.θ += self.α*dot


    def get_learning_rate(self, episode_num):
        return self.α / (1 + episode_num * 0.001)

In [None]:
def run_episode(env, policy, render=False):

    observation = env.reset()
    totalreward = 0

    observations = []
    actions = []
    rewards = []
    probs = []

    done = False

    while not done:
        if render:
            env.render()

        observations.append(observation)

        action, prob = policy.act(observation)
        observation, reward, done, info = env.step(action)

        totalreward += reward
        rewards.append(reward)
        actions.append(action)
        probs.append(prob)

    return totalreward, np.array(rewards), np.array(observations), np.array(actions), np.array(probs)

In [None]:
from gym.wrappers import RecordVideo

def train(θ, α, γ, Policy, MAX_EPISODES=1000, seed=None, evaluate=False):

    # initialize environment and policy
    env = gym.make('CartPole-v1')
    if seed is not None:
        env.seed(seed)
    episode_rewards = []
    policy = Policy(θ, α, γ)

    # train until MAX_EPISODES
    for i in range(MAX_EPISODES):

        # run a single episode
        total_reward, rewards, observations, actions, probs = run_episode(env, policy)

        # keep track of episode rewards
        episode_rewards.append(total_reward)

        # update policy
        policy.update(rewards, observations, actions)
        print("EP: " + str(i) + " Score: " + str(total_reward) + " ",end="\r", flush=False)

    # evaluation call after training is finished - evaluate last trained policy on 100 episodes
    if evaluate:
        env = RecordVideo(env, 'pg_cartpole/')
        for _ in range(100):
            run_episode(env, policy, render=False)
        env.env.close()

    return episode_rewards, policy

In [None]:
# additional imports for saving and loading a trained policy


# for reproducibility
GLOBAL_SEED = 0
np.random.seed(GLOBAL_SEED)

episode_rewards, policy = train(θ=np.random.rand(4),
                                α=0.002,
                                γ=0.99,
                                Policy=LogisticPolicy,
                                MAX_EPISODES=2000,
                                seed=None,
                                #num_simulations = 4,
                                evaluate=True)

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
#from google.colab import files

plt.plot(episode_rewards);
plt.savefig('Cartpole_TD.png')
#files.download('Cartpole_TD.png')

In [None]:
np.var(episode_rewards)

In [None]:
import gym
import numpy as np
from gym.wrappers import RecordVideo
import matplotlib.pyplot as plt
#from google.colab import files

class LogisticPolicy:
    def __init__(self, θ, α, γ):
        self.θ = θ
        self.α = α
        self.γ = γ

    def logistic(self, y):
        return 1 / (1 + np.exp(-y))

    def probs(self, x):
        y = x @ self.θ
        prob0 = self.logistic(y)
        return np.array([prob0, 1 - prob0])

    def act(self, x):
        probs = self.probs(x)
        action = np.random.choice([0, 1], p=probs)
        return action, probs[action]

    def grad_log_p(self, x):
        y = x @ self.θ
        grad_log_p0 = x - x * self.logistic(y)
        grad_log_p1 = -x * self.logistic(y)
        return grad_log_p0, grad_log_p1

    def grad_log_p_dot_rewards(self, grad_log_p, actions, discounted_rewards):
        return grad_log_p.T @ discounted_rewards

    def calculate_baseline(self, rewards):
        baseline = np.cumsum(rewards) / (np.arange(len(rewards)) + 1)
        return baseline

    def discount_rewards(self, rewards):
        discounted_rewards = np.zeros(len(rewards))
        cumulative_rewards = 0
        baseline = self.calculate_baseline(rewards)
        for i in reversed(range(0, len(rewards))):
            cumulative_rewards = cumulative_rewards * self.γ + rewards[i]
            discounted_rewards[i] = cumulative_rewards - baseline[i]
        return discounted_rewards

    def update(self, rewards, obs, actions):
        grad_log_p = np.array([self.grad_log_p(ob)[action] for ob, action in zip(obs, actions)])
        assert grad_log_p.shape == (len(obs), 4)

        discounted_rewards = self.discount_rewards(rewards)
        #baseline = self.calculate_baseline(rewards)
        advantages = discounted_rewards

        dot = self.grad_log_p_dot_rewards(grad_log_p, actions, advantages)

        self.θ += self.α * dot

def run_episode(env, policy, render=False):
    observation = env.reset()
    totalreward = 0
    observations, actions, rewards, probs = [], [], [], []
    done = False

    while not done:
        if render:
            env.render()
        observations.append(observation)
        action, prob = policy.act(observation)
        observation, reward, done, _ = env.step(action)
        totalreward += reward
        rewards.append(reward)
        actions.append(action)
        probs.append(prob)

    return totalreward, np.array(rewards), np.array(observations), np.array(actions), np.array(probs)

def train(θ, α, γ, Policy, MAX_EPISODES=5000, seed=None, evaluate=False):
    env = gym.make('CartPole-v1')
    if seed is not None:
        env.seed(seed)
    episode_rewards = []
    policy = Policy(θ, α, γ)

    for i in range(MAX_EPISODES):
        total_reward, rewards, observations, actions, probs = run_episode(env, policy)
        episode_rewards.append(total_reward)
        policy.update(rewards, observations, actions)
        print("EP: " + str(i) + " Score: " + str(total_reward) + " ", end="\r", flush=False)

    if evaluate:
        env = RecordVideo(env, 'pg_cartpole/')
        for _ in range(100):
            run_episode(env, policy, render=False)
        env.env.close()

    return episode_rewards, policy

# For reproducibility
GLOBAL_SEED = 0
np.random.seed(GLOBAL_SEED)

episode_rewards, policy = train(θ=np.random.rand(4),
                                α=0.002,
                                γ=0.99,
                                Policy=LogisticPolicy,
                                MAX_EPISODES=2000,
                                seed=None,
                                evaluate=True)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Training Progress')
plt.show()
plt.savefig('Cartpole_Cumsum.png')
#files.download('Cartpole_Cumsum.png')

In [None]:
np.var(episode_rewards)