In [1]:
import gymnasium as gym
import pickle
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from helper_functions import reward_print, print_Qtable
device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "cpu"
)
device = "cpu"
print(device)
CUDA_LAUNCH_BLOCKING=1

cpu


In [2]:
# AC2 algorithm 
class AC2:
    def __init__(self, env):
        self.actor = Actor(env)
        self.critic = Critic(self.actor.env.observation_space.shape[0])
    
    # Main training loop
    def train(self, episodes, gamma, greedy=False):
        total_reward = [0] * episodes
        for i in range(episodes):
            step = rewards = 0
            done = False     
            state, info = self.actor.env.reset()
            state = torch.tensor(state)
            while not done:
                # Actor makes decision 
                # Environment returns state and reward
                next_state, reward, done, action = self.actor.act(state, greedy)
                next_state = torch.tensor(next_state)#.todevice
                # Critic evaluates action 
                adv = self.critic.evaluate(state, next_state, reward, gamma ** step)
                # Pass that value to the Actor
                self.actor.evaluation(action, adv, state)
                
                state = next_state
                step += 1
                rewards += reward
                
                # Before it is done
                if done:
                    reward = 0
                    next_state = None
                    adv = self.critic.evaluate(state, next_state, reward, gamma ** step)
                    self.actor.evaluation(action, adv, state)
                
            total_reward[i] = rewards
            print("Episode:", i, " Reward", rewards)
        self.actor.env.close()
        return total_reward

               
    def save(self, filename):
        with open("pickles/" + filename + "actor.pickle", 'wb') as file:
            pickle.dump(self.actor.policy_net.state_dict(), file)
        with open("pickles/" + filename + "critic.pickle", 'wb') as file:
            pickle.dump(self.critic.policy_net.state_dict(), file)

In [3]:
# Actor thread
class ActorNet(nn.Module):  
    def __init__(self, obs, act):
        super(ActorNet, self).__init__()
        self.layer1 = nn.Linear(obs, 32)
        self.layer2 = nn.Linear(32, act)

    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.softmax(self.layer2(x), dim=-1)
        return x
        

class Actor:
    def __init__(self, env):
        self.env_type = env
        self.env = gym.make(env)
        self.policy_net = ActorNet(self.env.observation_space.shape[0], self.env.action_space.n)
        self.optimizer = optim.AdamW(self.policy_net.parameters(), amsgrad=True)
    
    def act(self, state, greedy):
        # Get the weights from the policy net
        weights = self.policy_net(state)
        # if greedy get max-arg 
        if greedy: 
            action = torch.argmax(weights)
        # Use multinomial to select probability / action
        else:
            action = torch.multinomial(weights, 1)
        # Run and return the action 
        state, reward, terminated, truncated, _ = self.env.step(action.item())
        return state, reward, terminated or truncated, action

    
    def evaluation(self, action, advantage, state):
        # Get the weights from the policy 
        weight = self.policy_net(state)
        # Calculate the log probability with the weights of the 
        # current state and action and then use the adv to get the loss 
        prob = torch.distributions.Categorical(weight).log_prob(action)  
        loss = -1 * prob * advantage.detach()
        # back prop
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
    
    
    def change_render(self, render):
        if render:
            self.env = gym.make(self.env_type, render_mode="human")
        else: 
            self.env = gym.make(self.env_type)

In [4]:
# Critic thread
class CriticNet(nn.Module):  
    def __init__(self, obs):
        super(CriticNet, self).__init__()
        self.layer1 = nn.Linear(obs, 32)
        self.layer2 = nn.Linear(32, 1)

    def forward(self, x):
        x = F.relu(self.layer1(x))
        return  self.layer2(x)


class Critic:
    def __init__(self, obs):
        
        self.policy_net = CriticNet(obs)
        self.optimizer = optim.AdamW(self.policy_net.parameters(),amsgrad=True )
    
    
    def evaluate(self, state, next_state, reward, gamma):
        # Get Qvalue and next Qvalue from policy         
        Qvalue = self.policy_net(state)
        if next_state is not None:
            next_Qvalue = self.policy_net(next_state)
        else: 
            next_Qvalue = 0
        
        # Calculate the TD and advantage for the next action
        TD = reward + (gamma * next_Qvalue)
        adv = Qvalue - TD
        TD = torch.tensor([TD])
        loss_function = nn.MSELoss()
        # print("Q:", Qvalue)
        # print("TD:", TD)
        # print("adv:", adv)
        loss = loss_function(Qvalue, TD)
        # print(loss)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return adv

In [5]:
# AC2 Agent for Cart Pole
environment = 'BipedalWalker-v3'
agent = AC2(environment)

episodes = 10
gamma = 1.003

agent.actor.change_render(True)

# Main training session
total_rewards = agent.train(episodes, gamma)
print("Best reward: ", max(total_rewards))
agent.save("drpreisl_CartPole")
reward_print(total_rewards, episodes, "grid world")

# Greedy run 
agent.actor.change_render(True)
total_greedy_rewards = agent.train(11, gamma, greedy=True)
reward_print(total_greedy_rewards, 10, "greedy")

AttributeError: 'Box' object has no attribute 'n'

In [82]:
environment = gym.make('CartPole-v1', render_mode="human")

state, info = environment.reset()

done = False
for i in range(100):
    if not done:
        state, reward, terminated, truncated, _  = environment.step(environment.action_space.sample())
        done = terminated or truncated
        # print(reward)
    else:
        break
environment.close()


In [84]:
TD = 1234
TD = torch.tensor([4, 3, 6, 1])
print(torch.argmax(TD).item())


2
