In [None]:
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchsummary import summary

In [None]:
class G_NN(nn.Module):
    def __init__(self, alpha, ip_dims, n_actions):
        super(G_NN, self).__init__()
        self.ip_dims = ip_dims
        self.n_actions = n_actions
        
        self.fc1 = nn.Linear(*self.ip_dims, 1024)
        #self.b1  = nn.BatchNorm1d(1024)
        self.fc2 = nn.Linear(1024, 512)
        #self.b2  = nn.BatchNorm1d(512)
        self.fc3 = nn.Linear(512, 256)
        #self.b3  = nn.BatchNorm1d(256)
        self.fc4 = nn.Linear(256, 128)
        #self.b4  = nn.BatchNorm1d(128)
        self.fc5 = nn.Linear(128, 56)
        #self.b5  = nn.BatchNorm1d(56)
        self.fc6 = nn.Linear(56, self.n_actions)
        self.optimizer = optim.Adam(self.parameters(), lr=alpha)
        
        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cuda:1')
        self.to(self.device)
        
    def forward(self, observation):
        state = T.Tensor(observation).to(self.device)
        x = F.relu(self.fc1(state))
        x = self.b1(x)
        x = F.relu(self.fc2(x))
        x = self.b2(x)
        x = F.relu(self.fc3(x))
        x = self.b3(x)
        x = F.relu(self.fc4(x))
        x = self.b4(x)
        x = F.relu(self.fc5(x))
        x = self.b5(x)
        
        x = self.fc6(x)
        return x

In [None]:
class AC_agent(nn.Module):
    def __init__(self, alpha, ip_dims, n_actions):
        super(AC_agent, self).__init__()
        self.ip_dims = ip_dims
        self.n_actions = n_actions
        
        self.fc1 = nn.Linear(*self.ip_dims, 1024)
        #self.b1  = nn.BatchNorm1d(1024)
        self.fc2 = nn.Linear(1024, 512)
        #self.b2  = nn.BatchNorm1d(512)
        self.fc3 = nn.Linear(512, 256)
        #self.b3  = nn.BatchNorm1d(256)
        self.fc4 = nn.Linear(256, 128)
        #self.b4  = nn.BatchNorm1d(128)
        self.fc5 = nn.Linear(128, 56)
        #self.b5  = nn.BatchNorm1d(56)
        
        self.pi = nn.Linear(56, self.n_actions)
        self.v = nn.Linear(56, 1)
        
        self.optimizer = optim.Adam(self.parameters(), lr=alpha)
        
        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cuda:1')
        self.to(self.device)
        
        
        
    def forward(self, observation):
        state = T.Tensor(observation).to(self.device)
        x = F.relu(self.fc1(state))
        #x = self.b1(x)
        x = F.relu(self.fc2(x))
        #x = self.b2(x)
        x = F.relu(self.fc3(x))
        #x = self.b3(x)
        x = F.relu(self.fc4(x))
        #x = self.b4(x)
        x = F.relu(self.fc5(x))
        #x = self.b5(x)
        
        pi = self.pi(x)
        v = self.v(x)
        return (pi, v)

In [None]:
class Agent(object): 
    def __init__(self, alpha, beta, ip_dims, gamma=0.99, n_actions=2):
        
        self.gamma = gamma
        
        self.actor = G_NN(alpha, ip_dims, n_actions)
        self.critic = G_NN(alpha, ip_dims, 1)
        
        self.log_prob = None
    
    def choose_action(self, observations):
        prob = F.softmax(self.actor.forward(observations))
        action_prob = T.distributions.Categorical(prob)
        action = action_prob.sample()
        self.log_prob = action_prob.log_prob(action)
        
        return action.item()
    
    def learn(self, state, reward, new_state, done):
        self.actor.optimizer.zero_grad()
        self.critic.optimizer.zero_grad()
        
        critic_val_ = self.critic.forward(new_state)
        critic_val = self.critic.forward(state)
        
        reward = T.tensor(reward, dtype=T.float).to(self.actor.device)
        
        delta  = reward + self.gamma*critic_val_*(1-int(done)) - critic_val
        
        actor_loss = -self.log_prob*delta
        critic_loss = delta**2
        
        (actor_loss + critic_loss).backward()
        
        self.actor.optimizer.step()
        self.critic.optimizer.step()

In [None]:
class NewAgent(object):
    def __init__(self, alpha, ip_dims, gamma=0.99, n_actions=2):
        self.gamma = gamma
        self.AC = AC_agent(alpha, ip_dims, n_actions)
        self.log_probs = None
        
    def choose_action(self, observations):
        prob, _ = self.AC.forward(observations)
        prob = F.softmax(prob)
        
        action_prob = T.distributions.Categorical(prob)
        action  = action_prob.sample()
        
        log_probs = action_prob.log_prob(action)
        self.log_probs =log_probs
        
        return action.item()
    
    def learn(self, state, reward, new_state, done):
        self.AC.optimizer.zero_grad()
        
        _, critic_val_ = self.AC.forward(new_state)
        _, critic_val = self.AC.forward(state)
        
        reward = T.tensor(reward, dtype=T.float).to(self.AC.device)
        
        delta  = reward + self.gamma*critic_val_*(1-int(done)) - critic_val
        
        actor_loss = -self.log_probs*delta
        critic_loss = delta**2
        
        (actor_loss + critic_loss).backward()
        
        self.AC.optimizer.step()

In [None]:
import gym 
import matplotlib.pyplot as plt 
import numpy as np

def plotLearning(scores, filename, x=None, window=5):
    N = len(scores)
    running_avg = np.empty(N)
    for t in range(N):
        running_avg[t] = np.mean(scores[max(0, t-window):(t+1)])
    if x is None:
        x = [i for i in range(N)]
    plt.ylabel('Score')       
    plt.xlabel('Game')                     
    plt.plot(x, running_avg)
    plt.savefig(filename)
    
if __name__ == '__main__':
    agent = NewAgent(alpha=0.000001, ip_dims=[8], gamma=0.9, n_actions=4)
    env = gym.make('LunarLander-v2')
    score_his = []
    num_ep = 2000
    
    for i in range(num_ep):
        done = False
        score = 0
        obs = env.reset()
        
        while not done:
            a = agent.choose_action(obs)
            obs_, r, done, info = env.step(a)
            agent.learn(obs, r, obs_, done)
            obs = obs_
            score += r
            '''if i > 1990:
                env.render()'''
        score_his.append(score)
        avg_score= np.mean(score_his[-100:])
        print('episode:', i, 'score: %.2f' %score, 'avg_score: %.2f' %avg_score)
        
        
    filename = 'Lunar-Lander-actor-critic-new-agent-alpha00001-beta00005-2048x512fc-2000games.png'
    plotLearning(score_his, filename=filename, window=50)