<a href="https://colab.research.google.com/github/kovika98/reverse_pendulum/blob/dev/Reverse_Pendulum.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!rm -rf ./sample_data

from dataclasses import dataclass, replace as dt_replace
import numpy as np
from copy import copy
import gym
from gym import spaces


# Környezet implementációja :)


## Dataclass - State

In [3]:
@dataclass
class State:
    p_G: float = 0.0
    p_dG: float = 0.0
    c_X: float = 0.0
    c_dX: float = 0.0

    def flatten(self):
        return np.array([self.p_G, self.p_dG, self.c_X, self.c_dX])

## Env

In [4]:
class DoubleCartPoleEnv(gym.Env):
    @property
    def action_space(self):
        return self._action_space

    @property
    def observation_space(self):
        return self._observation_space

    def __init__(self, timeStep=0.1):
        # Boundaries
        self.maxX = 2.4
        self.maxG = 12 * 2 * np.pi / 360
        self.maxT = 0.8

        # Cart data
        self.mC = 1
        self.mP = 0.1
        self.lenP = 1
        self.radW = 0.2
        self.friction = 1
        self.coefR = 0.001

        # Universal constants
        self.g = 9.81

        # Computed values
        self.p_I = 1/3 * self.mP * (self.lenP ** 2)
        self.mTot = self.mC + self.mP
        self.dt = timeStep
        # spaces.Box(-self.maxT, self.maxT, shape=(1,))
        self._action_space = spaces.Discrete(7)
        boundary = np.array([self.maxG,
                             np.finfo(np.float32).max,
                             self.maxX,
                             np.finfo(np.float32).max],
                            dtype=np.float32)
        self._observation_space = spaces.Box(-boundary,
                                             boundary, dtype=np.float32)

        self.viewer = None

    def step(self, action):
        torque = np.linspace(-self.maxT, self.maxT,
                              self.action_space.n)[action]
        F = (2.0*torque - self.coefR*self.mTot*self.g/2.0)/self.radW

        sinG = np.sin(self.state.p_G)
        cosG = np.cos(self.state.p_G)

        _a = (-1 * F - self.mP * 0.5 * self.lenP * (self.state.p_dG ** 2) *
              sinG) / self.mTot
        _b = (4/3 - self.mP * (cosG ** 2) / self.mTot)

        p_ddG = (self.g * sinG + cosG * _a) \
            / (0.5 * self.lenP * _b)

        _c = (self.state.p_dG ** 2) * sinG - \
            p_ddG * cosG
        c_ddX = (F + self.mP * 0.5 * self.lenP * _c) / self.mTot

        self.state.c_dX += self.dt * c_ddX
        self.state.c_X += self.dt * self.state.c_dX

        self.state.p_dG += self.dt * p_ddG
        self.state.p_G += self.dt * self.state.p_dG

        terminate = False
        if np.abs(self.state.p_G) > self.maxG or np.abs(self.state.c_X) > \
          self.maxX:
            terminate = True
        return np.array(dt_replace(self.state).flatten()), 1.0, terminate, \
          {"action": action}

    def reset(self):
        self.state = State(p_dG=0.01)
        return np.array(dt_replace(self.state).flatten())

    def render(self, mode='human'):
        screen_w = 600
        screen_h = 400

        world_width = self.maxX * 2
        scale = screen_w/world_width
        pole_w = 8.0
        polelen = scale * self.lenP
        car_w = 70.0
        car_h = 40.0
        wheel_r = 8.0

        if self.viewer is None:
            from gym.envs.classic_control import rendering
            self.viewer = rendering.Viewer(screen_w, screen_h)

            # Wheel geometry
            wheel = rendering.make_circle(wheel_r)
            self.wheeltrans_l = rendering.Transform(
                translation=(-1/3*car_w, wheel_r))
            wheel.set_color(.6, .6, .6)
            wheel.add_attr(self.wheeltrans_l)
            self.viewer.add_geom(wheel)
            wheel = rendering.make_circle(wheel_r)
            self.wheeltrans_r = rendering.Transform(
                translation=(1/3*car_w, wheel_r))
            wheel.set_color(.6, .6, .6)
            wheel.add_attr(self.wheeltrans_r)
            self.viewer.add_geom(wheel)

            # Car geometry
            l, r, t, b = -car_w / 2, car_w / 2, car_h, 0
            car = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            self.cartrans = rendering.Transform(translation=(0, wheel_r))
            car.add_attr(self.cartrans)
            car.set_color(0.3, 0.3, 1)
            self.viewer.add_geom(car)

            # Pole geometry
            l, r, t, b = -pole_w / 2, pole_w / 2, polelen, 0
            pole = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
            pole.set_color(0, 0.9, 0.4)
            self.poletrans = rendering.Transform(translation=(0, car_h))
            pole.add_attr(self.poletrans)
            pole.add_attr(self.cartrans)
            self.viewer.add_geom(pole)

            # Axle
            axle = rendering.make_circle(pole_w/2)
            axle.add_attr(self.poletrans)
            axle.add_attr(self.cartrans)
            axle.set_color(.45, .45, .45)
            self.viewer.add_geom(axle)

            self._pole_geom = pole

        if self.state is None:
            return None

        pole = self._pole_geom
        l, r, t, b = -pole_w / 2, pole_w / 2, polelen, 0
        pole.v = [(l, b), (l, t), (r, t), (r, b)]

        s = self.state
        carX = s.c_X * scale + screen_w / 2  # middle of cart
        self.wheeltrans_l.set_translation(carX - 1/3 * car_w, wheel_r)
        self.wheeltrans_r.set_translation(carX + 1/3 * car_w, wheel_r)
        self.cartrans.set_translation(carX, wheel_r)
        self.poletrans.set_rotation(-s.p_G)

        return self.viewer.render(return_rgb_array=mode == 'rgb_array')

    def close(self):
        if self.viewer:
            self.viewer.close()
            self.viewer = None

# Modell implementációja

## Modell

In [5]:
import tensorflow as tf
import tensorflow.keras.layers as kl

class ProbabilityDistribution(tf.keras.Model):
    def call(self, logits, **kwargs):
        return tf.squeeze(tf.random.categorical(logits, 1), axis=-1)


class Model(tf.keras.Model):
    def __init__(self, num_actions):
        super().__init__('mlp_policy')
        # Note: no tf.get_variable(), just simple Keras API!
        self.hidden1 = kl.Dense(128, activation='relu')
        self.hidden2 = kl.Dense(128, activation='relu')
        self.value = kl.Dense(1, name='value')
        # Logits are unnormalized log probabilities.
        self.logits = kl.Dense(num_actions, name='policy_logits')
        self.dist = ProbabilityDistribution()

    def call(self, inputs, **kwargs):
        # Inputs is a numpy array, convert to a tensor.
        x = tf.convert_to_tensor(inputs)
        # Separate hidden layers from the same input tensor.
        hidden_logs = self.hidden1(x)
        hidden_vals = self.hidden2(x)
        return self.logits(hidden_logs), self.value(hidden_vals)

    def action_value(self, obs):
        # Executes `call()` under the hood.
        logits, value = self.predict_on_batch(obs)
        action = self.dist.predict_on_batch(logits)
        # Another way to sample actions:
        #   action = tf.random.categorical(logits, 1)
        # Will become clearer later why we don't use it.
        return np.squeeze(action, axis=-1), np.squeeze(value, axis=-1)

## Ágens

In [6]:
import tensorflow.keras.losses as kls
import tensorflow.keras.optimizers as ko


class A2CAgent:
    def __init__(self, model, lr=7e-3, gamma=0.99, value_c=0.5, entropy_c=1e-4):
        # `gamma` is the discount factor
        self.gamma = gamma
        # Coefficients are used for the loss terms.
        self.value_c = value_c
        self.entropy_c = entropy_c

        self.model = model
        self.model.compile(
            optimizer=ko.RMSprop(lr=lr),
            # Define separate losses for policy logits and value estimate.
            loss=[self._logits_loss, self._value_loss])

    def test(self, env, render=True):
        obs, done, ep_reward = env.reset(), False, 0
        while not done:
            if render:
                env.render()
            action, _ = self.model.action_value(obs[None, :])
            obs, reward, done, _ = env.step(action)
            ep_reward += reward
        return ep_reward

    def _value_loss(self, returns, value):
        # Value loss is typically MSE between value estimates and returns.
        return self.value_c * kls.mean_squared_error(returns, value)

    def _logits_loss(self, actions_and_advantages, logits):
        # A trick to input actions and advantages through the same API.
        actions, advantages = tf.split(actions_and_advantages, 2, axis=-1)

        # Sparse categorical CE loss obj that supports sample_weight arg on `call()`.
        # `from_logits` argument ensures transformation into normalized probabilities.
        weighted_sparse_ce = kls.SparseCategoricalCrossentropy(
            from_logits=True)

        # Policy loss is defined by policy gradients, weighted by advantages.
        # Note: we only calculate the loss on the actions we've actually taken.
        actions = tf.cast(actions, tf.int32)
        policy_loss = weighted_sparse_ce(
            actions, logits, sample_weight=advantages)

        # Entropy loss can be calculated as cross-entropy over itself.
        probs = tf.nn.softmax(logits)
        entropy_loss = kls.categorical_crossentropy(probs, probs)

        # We want to minimize policy and maximize entropy losses.
        # Here signs are flipped because the optimizer minimizes.
        return policy_loss - self.entropy_c * entropy_loss

    def train(self, env, batch_sz=64, updates=250):
        # Storage helpers for a single batch of data.
        actions = np.empty((batch_sz,), dtype=np.int32)
        rewards, dones, values = np.empty((3, batch_sz))
        observations = np.empty((batch_sz,) + env.observation_space.shape)

        # Training loop: collect samples, send to optimizer, repeat updates times.
        ep_rewards = [0.0]
        next_obs = env.reset()
        for update in range(updates):
            for step in range(batch_sz):
                observations[step] = next_obs.copy()
                actions[step], values[step] = self.model.action_value(
                    next_obs[None, :])
                next_obs, rewards[step], dones[step], _ = env.step(
                    actions[step])

                ep_rewards[-1] += rewards[step]
                if dones[step]:
                    ep_rewards.append(0.0)
                    next_obs = env.reset()

            _, next_value = self.model.action_value(next_obs[None, :])

            returns, advs = self._returns_advantages(
                rewards, dones, values, next_value)
            # A trick to input actions and advantages through same API.
            acts_and_advs = np.concatenate(
                [actions[:, None], advs[:, None]], axis=-1)

            # Performs a full training step on the collected batch.
            # Note: no need to mess around with gradients, Keras API handles it.
            losses = self.model.train_on_batch(
                observations, [acts_and_advs, returns])

        return ep_rewards

    def _returns_advantages(self, rewards, dones, values, next_value):
        # `next_value` is the bootstrap value estimate of the future state (critic).
        returns = np.append(np.zeros_like(rewards), next_value, axis=-1)

        # Returns are calculated as discounted sum of future rewards.
        for t in reversed(range(rewards.shape[0])):
            returns[t] = rewards[t] + self.gamma * \
                returns[t + 1] * (1 - dones[t])
        returns = returns[:-1]

        # Advantages are equal to returns - baseline (value estimates in our case).
        advantages = returns - values

        return returns, advantages

# Execution

In [7]:
env = DoubleCartPoleEnv()
model = Model(num_actions=env.action_space.n)
agent = A2CAgent(model)
rewards_history = agent.train(env)
print("Finished training, testing...")

Finished training, testing...


In [16]:
for i in range(5):
    print("Alive for %.2f seconds" % (agent.test(env, False)/10.0))

Alive for 44.60 seconds
Alive for 35.80 seconds
Alive for 123.90 seconds
Alive for 13.30 seconds
Alive for 13.00 seconds
