In [1]:
import torch
torch.cuda.is_available()

True

In [2]:
torch.device("cuda")

device(type='cuda')

In [1]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
from collections import deque, namedtuple
import copy

In [45]:
class Brain(nn.Module):
    def __init__(self, input_shape, output_size):
        super(Brain, self).__init__()
        self.input_shape = input_shape
        
        # Convolutional layers
        self.conv1 = nn.Conv2d(input_shape[2], 32, kernel_size=4, stride=2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=2)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, stride=1)

        # Calculate the size of the output from the last conv layer
        conv_out_size = self._get_conv_output(input_shape)

        # Fully connected layers
        self.fc1 = nn.Linear(conv_out_size, 512)
        self.fc2 = nn.Linear(512, output_size)

    def _get_conv_output(self, shape):
        input = torch.rand(1, *shape)
        input = input.permute(0, 3, 1, 2)
        output = self.conv3(self.conv2(self.conv1(input)))
        return int(np.prod(output.size()))

    def forward(self, x):
        if x.dim() == 3:
            x = x.unsqueeze(0)
        if x.size(1) != self.input_shape[2]:
            x = x.permute(0, 3, 1, 2)
        
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        return self.fc2(x)

In [46]:
import gymnasium as gym
env = gym.make("CarRacing-v2")
state_shape = env.observation_space.shape
state_size = env.observation_space.shape[0]
action_size = env.action_space.shape[0]
print("State shape: ", state_shape)
print("state size: ", state_size)
print("action size: ", action_size)

State shape:  (96, 96, 3)
state size:  96
action size:  3


In [47]:
learning_rate = 1e-3
minibatch_size = 64 
discount_factor = 0.99
replay_buffer_size = int(1e5)
tau = 1e-3 

In [48]:
class ReplayMemory:
    def __init__(self, capacity, device="cuda:0" if torch.cuda.is_available() else "cpu"):
        self.device = torch.device(device)
        self.capacity = capacity
        self.memory = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        # Ensure state and next_state are numpy arrays
        state = np.array(state)
        next_state = np.array(next_state)
        self.memory.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        experiences = random.sample(self.memory, k=batch_size)
        
        states = torch.from_numpy(np.array([e[0] for e in experiences if e is not None])).float().to(self.device)
        actions = torch.from_numpy(np.array([e[1] for e in experiences if e is not None])).float().to(self.device)
        rewards = torch.from_numpy(np.array([e[2] for e in experiences if e is not None])).float().unsqueeze(1).to(self.device)
        next_states = torch.from_numpy(np.array([e[3] for e in experiences if e is not None])).float().to(self.device)
        dones = torch.from_numpy(np.array([e[4] for e in experiences if e is not None]).astype(np.uint8)).float().unsqueeze(1).to(self.device)
        
        return (states, actions, rewards, next_states, dones)

    def __len__(self):
        return len(self.memory)

In [57]:
class OUNoise:
    def __init__(self, size, mu=0., theta=0.15, sigma=0.2):
        self.mu = mu * np.ones(size)
        self.theta = theta
        self.sigma = sigma
        self.reset()

    def reset(self):
        self.state = copy.copy(self.mu)

    def sample(self):
        x = self.state
        dx = self.theta * (self.mu - x) + self.sigma * np.random.randn(len(x))
        self.state = x + dx
        return self.state.tolist()  # Return as a list

In [71]:
UPDATE_EVERY = 4

class Agent():
    def __init__(self, state_shape, action_size, learning_rate=1e-4, replay_buffer_size=100000, minibatch_size=64, tau=1e-3):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.state_shape = state_shape
        self.action_size = action_size
        
        # Using actor-critic approach for continuous action space
        self.actor = Brain(state_shape, action_size).to(self.device)
        self.critic = Brain(state_shape, 1).to(self.device)  # Critic outputs a single value
        self.target_actor = Brain(state_shape, action_size).to(self.device)
        self.target_critic = Brain(state_shape, 1).to(self.device)
        
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=learning_rate)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=learning_rate)
        
        self.memory = ReplayMemory(replay_buffer_size, self.device)
        self.t_step = 0
        
        self.noise = OUNoise(action_size)
        
        self.minibatch_size = minibatch_size
        self.tau = tau

    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        self.actor.eval()
        with torch.no_grad():
            action = self.actor(state).cpu().numpy().squeeze()
        self.actor.train()
        action += self.noise.sample()
        
        # Clip actions between -1 and 1, then convert to list
        action = np.clip(action, -1, 1).tolist()
        
        # Ensure the correct action format for CarRacing-v2
        steering = action[0]
        gas = max(0, action[1])
        brake = max(0, -action[1])
        
        return [steering, gas, brake]

    def step(self, state, action, reward, next_state, done):
        # Save experience in replay memory
        self.memory.push(state, action, reward, next_state, done)
        
        # Learn every UPDATE_EVERY time steps.
        self.t_step = (self.t_step + 1) % UPDATE_EVERY
        if self.t_step == 0:
            # If enough samples are available in memory, get random subset and learn
            if len(self.memory) > self.minibatch_size:
                experiences = self.memory.sample(self.minibatch_size)
                self.learn(experiences, 0.99)  # Using 0.99 as discount factor

    def learn(self, experiences, gamma):
        states, actions, rewards, next_states, dones = experiences

        # Update critic
        actions_next = self.target_actor(next_states)
        Q_targets_next = self.target_critic(next_states)
        Q_targets = rewards + (gamma * Q_targets_next * (1 - dones))
        Q_expected = self.critic(states)
        critic_loss = F.mse_loss(Q_expected, Q_targets)
        
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # Update actor
        actions_pred = self.actor(states)
        actor_loss = -self.critic(states).mean()
        
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # Update target networks
        self.soft_update(self.critic, self.target_critic, self.tau)
        self.soft_update(self.actor, self.target_actor, self.tau)

    def soft_update(self, local_model, target_model, tau):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)

In [72]:
agent = Agent(state_shape, action_size)

In [83]:
number_episodes = 2000
max_timesteps_per_episode = 1000
scores_window = deque(maxlen=100)

def normalize_state(state):
    return (state - state.mean()) / (state.std()+ 1e-8)

epsilon = 1.0
epsilon_decay = 0.995
epsilon_min = 0.01

for episode in range(1, number_episodes + 1):
    state, _ = env.reset()
    state = normalize_state(state)
    agent.noise.reset()
    score = 0
    
    
    for t in range(max_timesteps_per_episode):
        if np.random.random() < epsilon:
            action = env.action_space.sample()
        else:
            action = agent.act(state)
        action = agent.act(state)
        next_state, reward, done, _, _ = env.step(action)
        next_state = normalize_state(next_state)
        agent.step(state, action, reward, next_state, done)
        state = next_state
        score += reward
        if done:
            break
    
    scores_window.append(score)
    
    # print(f'\rEpisode {episode}\tAverage Score: {np.mean(scores_window):.2f}')
    if episode % 100 == 0:
        print(f'\rEpisode {episode}\tAverage Score: {np.mean(scores_window):.2f}')
        torch.save(agent.actor.state_dict(), f'checkpoint_actor_{episode}.pth')
        torch.save(agent.critic.state_dict(), f'checkpoint_critic_{episode}.pth')
    
    if np.mean(scores_window) >= 200.0:
        print(f'\nEnvironment solved in {episode-100:d} episodes!\tAverage Score: {np.mean(scores_window):.2f}')
        torch.save(agent.actor.state_dict(), 'checkpoint_actor.pth')
        torch.save(agent.critic.state_dict(), 'checkpoint_critic.pth')
        break
    
if np.mean(scores_window) < 200.0:
    print(f'\nEnvironment not solved. Final Average Score: {np.mean(scores_window):.2f}')
    torch.save(agent.actor.state_dict(), 'checkpoint_actor_final.pth')
    torch.save(agent.critic.state_dict(), 'checkpoint_critic_final.pth')

KeyboardInterrupt: 

In [80]:
def normalize_state(state):
    return (state - state.mean()) / (state.std() + 1e-8).astype(np.float32)

number_episodes = 5000  # Increased from 2000
max_timesteps_per_episode = 1000
scores_window = deque(maxlen=100)

epsilon = 1.0
epsilon_decay = 0.995
epsilon_min = 0.01

for episode in range(1, number_episodes + 1):
    state, _ = env.reset()
    state = normalize_state(state).astype(np.float32)
    agent.noise.reset()
    score = 0
    
    for t in range(max_timesteps_per_episode):
        if np.random.random() < epsilon:
            action = env.action_space
            
        else:
            action = agent.act(state)
        if isinstance(action, list):
            action = np.array(action, dtype=np.float32)
            
        next_state, reward, done, _, _ = env.step(action)
           # Normalize and convert types
        next_state = next_state.astype(np.float32) if isinstance(next_state, np.ndarray) else next_state
        
        reward = np.float32(reward)
        agent.step(state, action, reward, next_state, done)
        state = next_state
        score += reward
        if done:
            break
    
    scores_window.append(score)
    epsilon = max(epsilon_min, epsilon * epsilon_decay)
    
    # print(f'\rEpisode {episode}\tAverage Score: {np.mean(scores_window):.2f}\tEpsilon: {epsilon:.2f}')
    if episode % 100 == 0:
        print(f'\rEpisode {episode}\tAverage Score: {np.mean(scores_window):.2f}\tEpsilon: {epsilon:.2f}')
        # Save checkpoints periodically
        torch.save(agent.actor.state_dict(), f'checkpoint_actor_{episode}.pth')
        torch.save(agent.critic.state_dict(), f'checkpoint_critic_{episode}.pth')
    
    if np.mean(scores_window) >= 200.0:
        print(f'\nEnvironment solved in {episode-100:d} episodes!\tAverage Score: {np.mean(scores_window):.2f}')
        torch.save(agent.actor.state_dict(), 'checkpoint_actor_final.pth')
        torch.save(agent.critic.state_dict(), 'checkpoint_critic_final.pth')
        break

# If not solved, save the final model anyway
if np.mean(scores_window) < 200.0:
    print(f'\nEnvironment not solved. Final Average Score: {np.mean(scores_window):.2f}')
    torch.save(agent.actor.state_dict(), 'checkpoint_actor_final.pth')
    torch.save(agent.critic.state_dict(), 'checkpoint_critic_final.pth')

TypeError: in method 'b2RevoluteJoint___SetMotorSpeed', argument 2 of type 'float32'

In [None]:
import glob
import io
import base64
import imageio
from IPython.display import HTML, display
from gymnasium.wrappers.monitoring.video_recorder import VideoRecorder

def show_video_of_model(agent, env_name):
    env = gym.make(env_name, render_mode='rgb_array')
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action.item())
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v2')

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()