# Actor Critic

In [1]:
from copy import deepcopy
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

In [2]:
class CliffWalkingEnv:

    def __init__(self):
        self.num_of_rows = 6
        self.num_of_cols = 10
        self.start_state = (5, 0)
        self.terminal_state = (5, 9)
        self.cliff_states = [(5, i) for i in range(1, 8)]
        self.current_state = None
        self.directions = [
            ((+0, +1), 'up', '⬆'),
            ((+1, +0), 'down', '⬇'),
            ((+0, -1), 'left', '⬅'),
            ((-1, +0), 'right', '➡')
        ]

    def step(self, action):
        x, y = self.current_state
        dx, dy = self.directions[action][0]
        nx, ny = x + dx, y + dy
        next_state = (nx, ny)
        if 0 <= nx < self.num_of_rows and 0 <= ny < self.num_of_cols:
            self.current_state = next_state

        reward = -5.0
        is_terminal = False
        if self.current_state in self.cliff_states:
            reward = -100.0
            self.current_state = deepcopy(self.start_state)
        elif self.current_state == self.terminal_state:
            is_terminal = True
        next_state_id = self.cords_to_state_id(self.current_state)
        return next_state_id, reward, is_terminal

    def reset_state(self):
        self.current_state = self.start_state
        return self.cords_to_state_id(self.current_state)

    def cords_to_state_id(self, state):
        r, c = state
        return r * self.num_of_cols + c

    def print_policy_table(self, actor_model):
        print(f'{f"State/Action":^12}', end='\t')
        for curr_action in self.directions:
            print(f'{curr_action[2]:^10}', end='\t')
        print()

        for r in range(0, self.num_of_rows):
            for c in range(0, self.num_of_cols):
                curr_state_cords = (r, c)
                curr_state = self.cords_to_state_id(curr_state_cords)
                probs = actor_model(curr_state)
                print(f"{f'({r}, {c})':^10}: ", end='\t')
                for prob in probs:
                    print(f"{prob:^10.3f}", end='\t')
                print()

In [3]:
class NeuralNetwork(nn.Module):
    def __init__(self, alpha, in_dims, hl1_dims, hl2_dims, hl3_dims, out_dims):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(in_dims, hl1_dims),
            nn.ReLU(),
            nn.Linear(hl1_dims, hl2_dims),
            nn.ReLU(),
            nn.Linear(hl2_dims, hl3_dims),
            nn.ReLU(),
            nn.Linear(hl3_dims, out_dims),
            nn.Softmax(dim=0)
        )
        self.optimizer = optim.Adam(self.parameters(), lr=alpha)

    def forward(self, x):
        x = torch.FloatTensor(np.array([x]))
        return self.layers(x)

In [4]:
class Agent(object):
    def __init__(self, alpha1, alpha2, in_dims=1, hl1_dims=256, hl2_dims=512, hl3_dims=128, out_dims=2, gamma=0.7):
        self.gamma = gamma
        self.actor = NeuralNetwork(alpha1, in_dims, hl1_dims, hl2_dims, hl3_dims, out_dims)
        self.critic = NeuralNetwork(alpha2, in_dims, hl1_dims, hl2_dims, hl3_dims, 1)
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.log_probs = None

    def next_action(self, curr_state, random=False):
        probs = self.actor(curr_state)
        if random:
            probs = torch.FloatTensor([0.25] * 4)
        action_cat_probs = torch.distributions.Categorical(probs)
        action = action_cat_probs.sample()
        self.log_probs = action_cat_probs.log_prob(action)
        return action.item()

    def train(self, curr_state, reward, next_state, done):
        self.actor.optimizer.zero_grad()
        self.critic.optimizer.zero_grad()

        critic_curr = self.critic.forward(curr_state)
        critic_next = self.critic.forward(next_state)

        reward = torch.tensor(reward, dtype=torch.float).to(self.device)
        delta = reward + self.gamma * critic_next * (1 - int(done)) - critic_curr

        critic_loss = delta ** 2
        actor_loss = -self.log_probs * delta

        (actor_loss + critic_loss).backward()
        self.actor.optimizer.step()
        self.critic.optimizer.step()

        return actor_loss.item(), critic_loss.item()

In [5]:
env = CliffWalkingEnv()
num_episodes = 100
agent = Agent(alpha1=0.01, alpha2=0.01, in_dims=1, hl1_dims=256, hl2_dims=512, hl3_dims=128, out_dims=4, gamma=0.7)
loss_per_episode = []
eps_init = 0.9
epsilon = 1.0
reward_per_episode = []

print(f"Initial Policy Table")
env.print_policy_table(agent.actor)

Initial Policy Table
State/Action	    ⬆     	    ⬇     	    ⬅     	    ➡     	
  (0, 0)  : 	  0.239   	  0.259   	  0.260   	  0.242   	
  (0, 1)  : 	  0.245   	  0.254   	  0.264   	  0.237   	
  (0, 2)  : 	  0.252   	  0.248   	  0.270   	  0.230   	
  (0, 3)  : 	  0.251   	  0.248   	  0.276   	  0.224   	
  (0, 4)  : 	  0.250   	  0.250   	  0.280   	  0.220   	
  (0, 5)  : 	  0.247   	  0.252   	  0.285   	  0.216   	
  (0, 6)  : 	  0.245   	  0.253   	  0.291   	  0.211   	
  (0, 7)  : 	  0.242   	  0.254   	  0.296   	  0.208   	
  (0, 8)  : 	  0.239   	  0.256   	  0.301   	  0.204   	
  (0, 9)  : 	  0.236   	  0.258   	  0.306   	  0.200   	
  (1, 0)  : 	  0.232   	  0.259   	  0.311   	  0.197   	
  (1, 1)  : 	  0.229   	  0.261   	  0.317   	  0.193   	
  (1, 2)  : 	  0.225   	  0.262   	  0.323   	  0.189   	
  (1, 3)  : 	  0.222   	  0.263   	  0.329   	  0.185   	
  (1, 4)  : 	  0.219   	  0.264   	  0.335   	  0.181   	
  (1, 5)  : 	  0.215   	  0.265   	  0.341   	  0.1

In [6]:
for i in range(num_episodes):
    if i % 10 == 0:
      print(f"Iteration: {i}")
    episode_reward = 0
    terminal = False
    curr_state = env.reset_state()
    actor_loss, critic_loss = 0, 0

    if epsilon > 0.1:
        epsilon = eps_init ** (i + 1)
    while not terminal:
        rand_num = np.random.random(1)[0]
        random = True
        # if rand_num > epsilon:
        #     random = False
        action = agent.next_action(curr_state, random=random)

        next_state, reward, terminal = env.step(action)
        a_loss, c_loss = agent.train(curr_state, reward, next_state, terminal)
        actor_loss += a_loss
        critic_loss += c_loss
        episode_reward += reward
        curr_state = next_state

    reward_per_episode.append(episode_reward)
    loss_per_episode.append((actor_loss, critic_loss))

Iteration: 0
Iteration: 10
Iteration: 20
Iteration: 30
Iteration: 40
Iteration: 50
Iteration: 60
Iteration: 70
Iteration: 80
Iteration: 90


In [7]:
print(f"Final Policy Table")
env.print_policy_table(agent.actor)

Final Policy Table
State/Action	    ⬆     	    ⬇     	    ⬅     	    ➡     	
  (0, 0)  : 	  0.239   	  0.259   	  0.260   	  0.242   	
  (0, 1)  : 	  0.245   	  0.254   	  0.264   	  0.237   	
  (0, 2)  : 	  0.252   	  0.248   	  0.270   	  0.230   	
  (0, 3)  : 	  0.251   	  0.248   	  0.276   	  0.224   	
  (0, 4)  : 	  0.250   	  0.250   	  0.280   	  0.220   	
  (0, 5)  : 	  0.247   	  0.252   	  0.285   	  0.216   	
  (0, 6)  : 	  0.245   	  0.253   	  0.291   	  0.211   	
  (0, 7)  : 	  0.242   	  0.254   	  0.296   	  0.208   	
  (0, 8)  : 	  0.239   	  0.256   	  0.301   	  0.204   	
  (0, 9)  : 	  0.236   	  0.258   	  0.306   	  0.200   	
  (1, 0)  : 	  0.232   	  0.259   	  0.311   	  0.197   	
  (1, 1)  : 	  0.229   	  0.261   	  0.317   	  0.193   	
  (1, 2)  : 	  0.225   	  0.262   	  0.323   	  0.189   	
  (1, 3)  : 	  0.222   	  0.263   	  0.329   	  0.185   	
  (1, 4)  : 	  0.219   	  0.264   	  0.335   	  0.181   	
  (1, 5)  : 	  0.215   	  0.265   	  0.341   	  0.178