In [1]:
!pip install gymnasium

Collecting gymnasium
  Using cached gymnasium-0.29.1-py3-none-any.whl.metadata (10 kB)
Collecting cloudpickle>=1.2.0 (from gymnasium)
  Using cached cloudpickle-3.0.0-py3-none-any.whl.metadata (7.0 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Using cached Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Using cached gymnasium-0.29.1-py3-none-any.whl (953 kB)
Using cached cloudpickle-3.0.0-py3-none-any.whl (20 kB)
Using cached Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, cloudpickle, gymnasium
Successfully installed cloudpickle-3.0.0 farama-notifications-0.0.4 gymnasium-0.29.1




In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from IPython.display import display, clear_output
import matplotlib.pyplot as plt

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

# Create the MountainCarContinuous environment
env = gym.make('MountainCarContinuous-v0')

# Set parameters
num_episodes = 100
learning_rate = 0.01  # Lower learning rate for stability in continuous action space
gamma = 0.99

# Define the policy network for continuous action space using Gaussian distribution
class PolicyNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(PolicyNetwork, self).__init__()
        # hidden layers
        self.fc1 = nn.Linear(input_dim, 24)
        self.fc2 = nn.Linear(24, 24)
        # Give the mean and std a seperate hidden layer
        self.fc3_mean = nn.Linear(24, 12)
        self.fc3_log_std = nn.Linear(24, 12)
        # output
        self.fc_mean = nn.Linear(12, output_dim)
        self.fc_log_std = nn.Linear(12, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        mean = self.fc3_mean(x)
        mean = self.fc_mean(mean)
        log_std = self.fc3_log_std(x)
        log_std = self.fc_log_std(log_std)
        std = torch.exp(log_std)
        return mean, std


# Initialize policy network and optimizer
policy_net = PolicyNetwork(env.observation_space.shape[0], env.action_space.shape[0]).to(device)
optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)


ModuleNotFoundError: No module named 'torch'

In [4]:
policy_net.load_state_dict(torch.load("./model1.pth"))

<All keys matched successfully>

In [5]:
# Function to choose action based on policy
def choose_action(state):
    state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
    mean, std = policy_net(state)
    normal = torch.distributions.Normal(mean, std)
    action = normal.sample()
    return action.detach().cpu().numpy().flatten()

# Function to compute the discounted rewards
def discount_rewards(rewards, gamma):
    discounted_rewards = np.zeros_like(rewards, dtype=np.float32)
    cumulative = 0.0
    for t in reversed(range(len(rewards))):
        cumulative = cumulative * gamma + rewards[t]
        discounted_rewards[t] = cumulative
    return discounted_rewards

# REINFORCE Algorithm
for episode in range(num_episodes):
    state = env.reset()[0]
    states, actions, rewards, log_probs = [], [], [], []

    # Generate an episode
    while True:
        action = choose_action(state)
        next_state, reward, done, _, _ = env.step(action)
        states.append(state)
        actions.append(action)
        rewards.append(reward)

        # Compute the log probability of the action
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
        mean, std = policy_net(state_tensor)
        normal = torch.distributions.Normal(mean, std)
        log_prob = normal.log_prob(torch.tensor(action, dtype=torch.float32).to(device)).sum()
        log_probs.append(log_prob)

        state = next_state
        if done:
            break

    # Convert lists to arrays
    states = np.array(states)
    actions = np.array(actions)
    rewards = np.array(rewards)
    log_probs = torch.stack(log_probs)

    # Compute discounted rewards
    discounted_rewards = discount_rewards(rewards, gamma)

    # Normalize discounted rewards
    discounted_rewards = torch.tensor(discounted_rewards, dtype=torch.float32).to(device)
    discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-7)

    # Update policy network
    optimizer.zero_grad()
    loss = -torch.sum(log_probs * discounted_rewards)  # Negative for gradient ascent
    loss.backward()
    optimizer.step()

    # Print episode information
    print(f'Episode: {episode+1}, Total Reward: {np.sum(rewards)}')


Episode: 1, Total Reward: -113.91503434582015
Episode: 2, Total Reward: 26.560122814670997


KeyboardInterrupt: 

In [None]:
torch.save(policy_net.state_dict(), "/content/drive/Othercomputers/My Computer/python/model2.pth")

In [None]:
# Function to play the game with the learned policy and render it
def play_game(policy_net, num_episodes=5):
    for episode in range(num_episodes):
        state = env.reset()[0]
        total_reward = 0
        while True:
            action = choose_action(state)
            state, reward, done, _, _ = env.step(action)
            total_reward += reward

            # Render the environment
            img = env.render(mode='human')
            plt.imshow(img)
            plt.axis('off')
            display(plt.gcf())
            clear_output(wait=True)

            if done:
                break
        print(f'Episode {episode+1}: Total Reward: {total_reward}')
    env.close()

# Play the game with the learned policy
play_game(policy_net)


In [None]:
# Function to visualize the policy
def visualize_policy(policy_net):
    x = np.linspace(env.observation_space.low[0], env.observation_space.high[0], 100)
    v = np.linspace(env.observation_space.low[1], env.observation_space.high[1], 100)
    X, V = np.meshgrid(x, v)
    actions = np.zeros_like(X, dtype=np.float32)

    for i in range(X.shape[0]):
        for j in range(X.shape[1]):
            state = np.array([X[i, j], V[i, j]])
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
            mean, std = policy_net(state_tensor)
            action = mean.detach().cpu().numpy().flatten()[0]  # Use mean action for visualization
            actions[i, j] = action

    plt.figure(figsize=(10, 8))
    plt.contourf(X, V, actions, levels=100, cmap='coolwarm')
    plt.colorbar()
    plt.xlabel('Position (x)')
    plt.ylabel('Velocity (v)')
    plt.title('Learned Policy Visualization')
    plt.show()

# Visualize the policy
visualize_policy(policy_net)
