In [12]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
from tqdm import tqdm
import numpy as np
import os
from collections import namedtuple
from time import time

# Check for CUDA
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Environment Wrapper to Handle Different Environments
class EnvironmentWrapper:
    def __init__(self, env_name, state_pad, action_pad):
        self.env = gym.make(env_name)
        self.state_pad = state_pad
        self.action_pad = action_pad
        
    def reset(self):
        state, _ = self.env.reset()
        return np.append(state, np.zeros(np.max((self.state_pad - len(state), 0))))
    
    def step(self, action):
        state, reward, done, _, info = self.env.step(action)
        state = np.append(state, np.zeros(np.max((self.state_pad - len(state), 0))))
        return state, reward, done, info

    def render(self):
        self.env.render()

# Define the Policy (Actor) and Value (Critic) Networks
class PolicyNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(PolicyNetwork, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(input_size, 32),
            nn.ReLU(),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, output_size),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        return self.network(x)

class ValueNetwork(nn.Module):
    def __init__(self, input_size, hidden_size=32):
        super(ValueNetwork, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 1)
        )

    def forward(self, x):
        return self.network(x)

Transition = namedtuple("Transition", ["state", "action", "reward", "next_state", "done"])

class ActorCriticAgent:
    def __init__(self, state_size, action_size, env_name, lr_actor=0.001, lr_critic=0.0005, verbosity=10):
        self.policy_network = PolicyNetwork(state_size, action_size).to(device)
        self.value_network = ValueNetwork(state_size).to(device)
        self.optimizer_actor = optim.Adam(self.policy_network.parameters(), lr=lr_actor)
        self.optimizer_critic = optim.Adam(self.value_network.parameters(), lr=lr_critic)
        self.verbosity = verbosity
        self.env_name =  env_name

    def select_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        probs = self.policy_network(state)
        m = Categorical(probs)
        action = m.sample()
        return action.item(), m.log_prob(action)

    def update_policy(self, transitions, gamma=0.99):
        loss_policy = 0
        loss_value = 0

        for transition in transitions:
            state, action, reward, next_state, done = transition

            state = torch.FloatTensor(state).unsqueeze(0).to(device)
            next_state = torch.FloatTensor(next_state).unsqueeze(0).to(device)
            action = torch.tensor(action).view(1, -1).to(device)
            reward = torch.tensor(reward).float().to(device)
            done = torch.tensor(done).float().to(device)

            # Compute value loss
            predicted_value = self.value_network(state)
            next_predicted_value = self.value_network(next_state)
            expected_value = reward + gamma * next_predicted_value * (1 - done)
            loss_value += nn.MSELoss()(predicted_value, expected_value.detach())

            # Compute policy loss
            _, log_prob = self.select_action(state)
            advantage = expected_value - predicted_value.detach()
            loss_policy += -log_prob * advantage

        # Backpropagate losses
        self.optimizer_actor.zero_grad()
        loss_policy.backward()
        self.optimizer_actor.step()

        self.optimizer_critic.zero_grad()
        loss_value.backward()
        self.optimizer_critic.step()

        return loss_policy.item(), loss_value.item()

    def save_models(self, path='models'):
        if not os.path.exists(path):
            os.makedirs(path)
        torch.save(self.policy_network.state_dict(), os.path.join(path, f'{self.env_name}_policy_network.pth'))
        torch.save(self.value_network.state_dict(), os.path.join(path, f'{self.env_name}_value_network.pth'))

    def load_models(self, path='models'):
        self.policy_network.load_state_dict(torch.load(os.path.join(path, f'{self.env_name}_policy_network.pth'), map_location=device))
        self.value_network.load_state_dict(torch.load(os.path.join(path, f'{self.env_name}_value_network.pth'), map_location=device))


    def train(self, env_wrapper, max_episodes=1000, max_steps=500, reward_threshold=475.0):
        self.results = {'Episode': [], 'Reward': [], "Average_100": [], 'Solved': -1, 'Duration': 0, 'Loss': [], 'LossV': []}
        results = self.results
        start_time = time()
        episode_rewards = []

        transitions = []
        for episode in range(max_episodes):
            state = env_wrapper.reset()
            episode_reward = 0
            

            for step in range(max_steps):
                action, log_prob = self.select_action(state)
                next_state, reward, done, _ = env_wrapper.step(action)
                transitions.append(Transition(state, action, reward, next_state, done))

                episode_reward += reward
                state = next_state

                if done:
                    break
                
                if len(transitions) == 100: # update weights every 100 steps
                    loss_policy, loss_value = self.update_policy(transitions)
                    results['Loss'].append(loss_policy)
                    results['LossV'].append(loss_value)
                    transitions = transitions [50:]

            loss_policy, loss_value = self.update_policy(transitions)
            results['Loss'].append(loss_policy)
            results['LossV'].append(loss_value)            
            episode_rewards.append(episode_reward)

            results['Episode'].append(episode)
            results['Reward'].append(episode_reward)

            if len(episode_rewards) >= 100:
                avg_reward = sum(episode_rewards[-100:]) / 100
                results['Average_100'].append(avg_reward)
                if avg_reward > reward_threshold and results['Solved'] == -1:
                    results['Solved'] = episode
                    print(f"Solved at episode {episode} with average reward {avg_reward}.")
                    break
            else:
                results['Average_100'].append(sum(episode_rewards) / len(episode_rewards))

            if episode % self.verbosity == 0:
                print(f"Episode {episode}, Avg Reward: {results['Average_100'][-1]}, PLoss: {loss_policy}, VLoss: {loss_value}")

        results['Duration'] = time() - start_time

        return results



In [13]:
# Example environment setup
env_name = 'CartPole-v1'  # This can be replaced with 'Acrobot-v1' or 'MountainCarContinuous-v0'
state_pad = 0  # Adjust based on the environment
action_pad = 0  # Adjust based on the environment
verbosity = 50

env_wrapper = EnvironmentWrapper(env_name, state_pad, action_pad)
state_size = env_wrapper.env.observation_space.shape[0] + state_pad
action_size = env_wrapper.env.action_space.n + action_pad

agent = ActorCriticAgent(state_size, action_size, env_name, verbosity=verbosity)
results = agent.train(env_wrapper, max_episodes=1000)

# Save the trained models
agent.save_models()


Episode 0, Avg Reward: 24.0, PLoss: 16.750720977783203, VLoss: 24.097936630249023
Episode 50, Avg Reward: 25.294117647058822, PLoss: 60.443931579589844, VLoss: 86.26277923583984
Episode 100, Avg Reward: 24.88, PLoss: 809.0989990234375, VLoss: 1381.7835693359375
Episode 150, Avg Reward: 23.24, PLoss: 1378.4110107421875, VLoss: 4726.27001953125


KeyboardInterrupt: 

In [6]:
episode_rewards

NameError: name 'episode_rewards' is not defined