In [None]:
#pip install gym==0.22.0
#pip install pygame
#pip install pyglet
#pip install torch
#pip install matplotlib

In [None]:
import gym
import torch
import torch.nn as nn
import numpy as np
import random
import matplotlib.pyplot as plt
from copy import deepcopy
from torch.distributions import Normal

def set_seed(env, seed):
    np.random.seed(seed)
    torch.manual_seed(seed);
    random.seed(seed)
    env.seed(seed)

In [None]:
class SAC(nn.Module):
    def __init__(self, state_dim, action_dim, gamma=0.99, alpha=1e-3, tau=1e-2,
                 batch_size=64, pi_lr=1e-3, q_lr=1e-3):
        super().__init__()

        #initialize pi netowrk model
        self.pi_model = nn.Sequential(nn.Linear(state_dim, 128), nn.ReLU(),
                                      nn.Linear(128, 128), nn.ReLU(),
                                      nn.Linear(128, 2 * action_dim), nn.Tanh())

        #initialize q1 netowrk model
        self.q1_model = nn.Sequential(nn.Linear(state_dim + action_dim, 128), nn.ReLU(),
                                      nn.Linear(128, 128), nn.ReLU(),
                                      nn.Linear(128, 1))

        #initialize q2 netowrk model
        self.q2_model = nn.Sequential(nn.Linear(state_dim + action_dim, 128), nn.ReLU(),
                                      nn.Linear(128, 128), nn.ReLU(),
                                      nn.Linear(128, 1))

        #set hyperparameters
        self.gamma = gamma #discount
        self.alpha = alpha #entropy coefficient
        self.tau = tau #smooth training parameter
        self.batch_size = batch_size #batch size

        #initialize memory
        self.memory = []

        #set gradient descent algorithm
        self.pi_optimizer = torch.optim.Adam(self.pi_model.parameters(), pi_lr)
        self.q1_optimizer = torch.optim.Adam(self.q1_model.parameters(), q_lr)
        self.q2_optimizer = torch.optim.Adam(self.q2_model.parameters(), q_lr)

        #initialize target netowrks
        self.q1_target_model = deepcopy(self.q1_model)
        self.q2_target_model = deepcopy(self.q2_model)


    def predict_actions(self, states):
        'pi model predicts action and log of its probabilities by state'
        means, log_stds = self.pi_model(states).T
        means, log_stds = means.unsqueeze(1), log_stds.unsqueeze(1)
        dists = Normal(means, torch.exp(log_stds))
        actions = dists.rsample()
        log_probs = dists.log_prob(actions)
        return actions, log_probs


    def get_action(self, state):
        'algorithm predicts action by state'
        state = torch.FloatTensor(state).unsqueeze(0)
        action, _ = self.predict_actions(state)
        return action.squeeze(1).detach().numpy()

    def update_model(self, loss, optimizer, model=None, target_model=None):
        'update given network model by in accordance with the given loss'

        #gradient descent step
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        #network update step if it is set
        if model != None and target_model != None:
            for param, terget_param in zip(model.parameters(), target_model.parameters()):
                new_terget_param = (1 - self.tau) * terget_param + self.tau * param
                terget_param.data.copy_(new_terget_param)


    def fit(self, state, action, reward, done, next_state):
        'one training step for the network models'

        #add state, action, reward, done, next_state to memory
        self.memory.append([state, action, reward, done, next_state])

        #if the memory is more than batch_size do:
        if len(self.memory) > self.batch_size:

            #sample batch from memory and convert it to torch.tensor
            batch = random.sample(self.memory, self.batch_size)
            states, actions, rewards, dones, next_states = map(np.array, zip(*batch))
            states, actions, rewards, dones, next_states = map(torch.FloatTensor, zip(*batch))
            rewards, dones = rewards.unsqueeze(1), dones.unsqueeze(1)

            #determine the right-hand side of the Bellman equation
            next_actions, next_log_probs = self.predict_actions(next_states)
            next_states_and_actions = torch.concatenate((next_states, next_actions), dim=1)
            next_q1_values = self.q1_target_model(next_states_and_actions)
            next_q2_values = self.q2_target_model(next_states_and_actions)
            next_min_q_values = torch.min(next_q1_values, next_q2_values)
            targets = rewards + self.gamma * (1 - dones) * (next_min_q_values - self.alpha * next_log_probs)

            #update q1 and q2 networks so that they predict this right-hand side
            states_and_actions = torch.concatenate((states, actions), dim=1)
            q1_loss = torch.mean((self.q1_model(states_and_actions) - targets.detach()) ** 2)
            q2_loss = torch.mean((self.q2_model(states_and_actions) - targets.detach()) ** 2)
            self.update_model(q1_loss, self.q1_optimizer, self.q1_model, self.q1_target_model)
            self.update_model(q2_loss, self.q2_optimizer, self.q2_model, self.q2_target_model)

            #update pi network so that it minimize q1 and q2 network values
            pred_actions, log_probs = self.predict_actions(states)
            states_and_pred_actions = torch.concatenate((states, pred_actions), dim=1)
            q1_values = self.q1_model(states_and_pred_actions)
            q2_values = self.q2_model(states_and_pred_actions)
            min_q_values = torch.min(q1_values, q2_values)
            pi_loss = - torch.mean(min_q_values - self.alpha * log_probs)
            self.update_model(pi_loss, self.pi_optimizer)

In [None]:
from IPython import display

def show(env, agent=None, episode_n=1, rollout_len=200):
    'show a video of the agent actions in the environment'

    for episode in range(episode_n):
        state = env.reset()
        img = plt.imshow(env.render(mode='rgb_array')) # only call this once
        for _ in range(rollout_len):
            img.set_data(env.render(mode='rgb_array')) # just update the data
            display.display(plt.gcf())
            display.clear_output(wait=True)

            if agent==None:
                action = env.action_space.sample()
            else:
                action = agent.get_action(state)

            state, _, _, _ = env.step(action)

#initialze environment
env = gym.make('Pendulum-v1')
set_seed(env, 21)

#show the pendulum with random agent actions
show(env)

In [None]:
def train(env, agent, episode_n, total_rewards=[], rollout_len=200):
    'train agent given number of episodes'

    #for each episode do:
    for episode in range(episode_n):

        #initialize total_reward and first state
        total_reward = 0
        state = env.reset()

        #for each step do:
        for t in range(rollout_len):

            #agent gives action by state
            action = agent.get_action(state)

            #environment gives the next state, the reward, and done=True if episode is finished
            next_state, reward, done, _ = env.step(2 * action)

            #do one trening step
            agent.fit(state, action, reward, done, next_state)

            #accumulate total_reward
            total_reward += reward

            #update state for next step
            state = next_state

        #print episode results
        print(f'episode: {len(total_rewards)}, total_reward: {total_reward}')

        #accumulate total_rewards array
        total_rewards.append(total_reward)

    #plot the results of the training
    plt.plot(total_rewards)
    plt.title('total_rewards')
    plt.grid()
    plt.show()

    return total_rewards

#set state and action space dimensions
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]

#initialize agent
agent = SAC(state_dim, action_dim)

#train it 15 episodes
total_rewards = train(env, agent, episode_n=15)

In [None]:
#show the pendulum after 15 training episodes
show(env, agent)

In [None]:
#train the agent 15 episodes more
total_rewards = train(env, agent, episode_n=15, total_rewards=total_rewards)

In [None]:
#show the pendulum after 30 training episodes
show(env, agent)