<a href="https://colab.research.google.com/github/Harrow-Enigma/ai-lecture-series-summer21/blob/main/Reinforcement_Learning_(Complete).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Reinforcement Learning

Make an agent that plays cartpole!

Copyright 2021 Team Enigma

In [None]:
# Copyright 2021 Team Enigma

#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

In [None]:
import tensorflow as tf
from tensorflow import keras
import gym
import pickle as pkl
import time, os
import numpy as np
from numpy.random import choice as sample

## Make the environment

In [None]:
env = gym.make('CartPole-v0')
ACTION_SPACE = env.action_space
OBSERVATION_SPACE = env.observation_space

print('Action space: ', env.action_space.n)
print('Observation space: ', env.observation_space.shape)

## Helper functions

In [None]:
def normalize(arr):
  arr = np.asarray(arr, dtype=np.float32)
  mean = arr.mean()
  std = arr.std()
  ret = (arr - mean) / std
  return ret.astype(np.float32)

def discounted_rewards(r, gamma=0.95):
  dr = np.zeros_like(r, dtype=np.float32)
  R = 0
  for i in reversed(range(len(r))):
    R = R * gamma + r[i]
    dr[i] = R
  return normalize(dr)

In [None]:
class History(object):
  def __init__(self):
    self.rewards=[]
    self.observations=[]
    self.actions=[]
  
  def restart(self):
    self.rewards=[]
    self.observations=[]
    self.actions=[]
  
  def write(self,observation,action,reward):
    self.rewards.append(reward)
    self.observations.append(observation)
    self.actions.append(action)
  
  def solidify(self):
    self.rewards=np.array(self.rewards)
    self.observations=np.array(self.observations)
    self.actions=np.array(self.actions)

## Keras model for our agent

In [None]:
class Agent(keras.Model):
    def __init__(self, action_space):
        super(Agent, self).__init__()
        self.output_dim = action_space
        self.dense0 = keras.layers.Dense(100, activation='relu')
        self.dense1 = keras.layers.Dense(100, activation='relu')
        self.dense2 = keras.layers.Dense(self.output_dim)

    def call(self, inputs):
        x = self.dense0(inputs)
        x = self.dense1(x)
        return self.dense2(x)
    
    def act(self, observations):
      self.observations = np.expand_dims(observations,axis=0)
      self.raw = self.predict(self.observations)
      self.raw = tf.nn.softmax(self.raw)
      self.out = np.squeeze(self.raw,0)
      self.action = np.random.choice(self.output_dim,1,p=self.out)
      return self.action[0]

In [None]:
# Initialising model
agent = Agent(ACTION_SPACE.n)

In [None]:
# Pass sample information
obs = env.reset()
agent.act(obs)

## Training Steps

In [None]:
# Policy optimization loss
def loss(actions, logits, rewards):
  neg_log_prob = tf.nn.sparse_softmax_cross_entropy_with_logits(actions,
                                                                logits)
  return tf.reduce_mean(neg_log_prob * rewards)

In [None]:
# Custom optimizer
optimizer=tf.keras.optimizers.Adam(1e-3)

In [None]:
# One training step - replaying from memory
def train_step(history, agent):
  r=discounted_rewards(history.rewards)

  with tf.GradientTape() as tape:
    logits = agent(history.observations)
    losses = loss(history.actions,logits,r)

  gradients = tape.gradient(losses, agent.trainable_variables)
  optimizer.apply_gradients(zip(gradients, agent.trainable_variables))
  return tf.reduce_sum(losses)

## Training!!!

In [None]:
history = History()

for i_episode in range(500):
    print('Starting edpisode {}'.format(i_episode))

    observation = env.reset()
    history.restart()
    t = 0

    while True:
      action = agent.act(observation)
      obs, reward, done, info = env.step(action)
      history.write(observation, action, reward)
      t += 1
      observation = obs

      if done:
          history.solidify()
          losses = train_step(history, agent)
          print("Episode finished after {} timesteps, with a loss of {}\n".format(t,losses))
          break

env.close()

In [None]:
agent.save_weights('weights.h5')