In [None]:
!which python3

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

In [None]:
import torch.nn.functional as F

In [None]:
# Train a simple NN on a single training sample!
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(4, 2)  # 784 input features, 128 output features
        self.fc2 = nn.Linear(2, 2)
        self.fc3 = nn.Linear(2, 2)
        self.fc4 = nn.Linear(2, 1)
        #self.fc3 = nn.Linear(2, 1)    # 10 output features for 10 classes

    def forward(self, x):
        #x = torch.sigmoid(self.fc1(x))
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        return x

In [None]:
# Create an instance of the Net
net = Net()

# Define a loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)  # Optimizer stores the weights / biases

# Random data and target labels for demonstration
inputs = torch.randn(1, 4)  # Example input
targets = torch.tensor([0])   # Example target. know to treat this index of class 0-9 and compare to max output?

# Zero the parameter gradients
optimizer.zero_grad()

# Forward + backward + optimize
outputs = net(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()

print('Finished Training')


## Now lets try training on multiple samples

With multiple samples you can choose how often you want to backpropagate. For example do you want to backprop after every sample or backprop after completing a batch.  Or even after a full pass through the training data (an epoch).

In [None]:
net = Net()
# Define an optimizer
optimizer = optim.SGD(net.parameters(), lr=0.01, momentum=0.9)

In [None]:
# Random data and target labels for demonstration
inputs = torch.randn(10000, 4)  # tensor of 10 vectors of length 4.
targets = torch.tensor([[sum(inputs[i])] for i in range(len(inputs))], dtype=torch.float32)  # target is sum of input vector
#targets = torch.tensor([[1.1]]*10)

In [None]:
outputs = []
losses = []
batch_size = 10


for i in range(len(inputs) // batch_size + 1):
    
    optimizer.zero_grad()
    
    input_batch = inputs[i: i + batch_size]
    target_batch = targets[i: i + batch_size]
    

    
    output_batch = net(input_batch)
    loss = F.mse_loss(output_batch, target_batch)
    losses.append(loss)
    loss.backward()
    optimizer.step()

## Throwing minimal DQN at Cartpole-V1

In [2]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random

# Define the DQN Network
class DQN(nn.Module):
    def __init__(self):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(4, 24)  # CartPole state is 4-dimensional
        self.fc2 = nn.Linear(24, 24)
        #self.fc2_1 = nn.Linear(24, 24) 
        #self.fc2_2 = nn.Linear(24, 24)
        #self.fc2_3 = nn.Linear(24, 24)
        self.fc3 = nn.Linear(24, 2)  # Two actions: push cart left or right

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        #x = torch.relu(self.fc2_1(x))
        #x = torch.relu(self.fc2_2(x))
        #x = torch.relu(self.fc2_3(x))
        x = self.fc3(x)
        return x

Below we try our oversimplified DQN on Cartpole. It doesn't learn anything. I think this is because the penalty (negative reward) of the pole falling is only directly attributed to the single action before it and not the series of actions before it, which may have already guaranteed the fall. In this example, we are backpropping after each step.

I think what we need to do is rather than backprop after each step we should backprop after each episode and make sure the negative reward is visible to a window of several timesteps before the actual fall. I'm guessing this is where a replay buffer comes in

In [1]:
# Initialize environment, model, and optimizer
env = gym.make('CartPole-v1')
model = DQN()
learning_rate = 0.0001
optimizer = optim.Adam(model.parameters(), lr = learning_rate)
loss_fn = nn.SmoothL1Loss()
#loss_fn = nn.MSELoss()

# Training parameters
num_episodes = 10000
gamma = 0.99  # Discount factor
epsilon_start = 1
epsilon_end = 0.05
epsilon_decay = 0.9995

for episode in range(num_episodes):
    state = env.reset()[0]
    epsilon = max(epsilon_end, epsilon_start * (epsilon_decay ** episode))  # Decrease epsilon
    for t in range(1, 10000):  # Limit the number of steps per episode
        state_tensor = torch.from_numpy(state).float().unsqueeze(0)
        
        # Blur the state
        precision = 0.001
        state_tensor = torch.round(state_tensor / precision) * precision
        
        # Select and perform an action using epsilon-greedy policy
        if random.random() <= epsilon:
            action = env.action_space.sample()
        else:
            with torch.no_grad():
                action = model(state_tensor).max(1)[1].item()

        # Observe new state and reward
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        
        if done:
            reward = -1  # Penalize if the pole fell over

        # Move to the next state
        next_state_tensor = torch.from_numpy(next_state).float().unsqueeze(0)

        # Compute the expected Q values
        expected_state_action_value = (torch.tensor(reward) + gamma * 
                                       model(next_state_tensor).max(1)[0] * (1-done)).unsqueeze(0)

        # Compute the loss
        state_action_value = model(state_tensor)[0, action].unsqueeze(0)
        loss = loss_fn(state_action_value, expected_state_action_value)

        # Optimize the model
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if done:
            break

        state = next_state  # Move to the next state
    
    if t >= 500:
        print("breaking on 500 training")
        break

    if episode % 100 == 0:
        print(f"Episode {episode} finished after {t} timesteps")

env.close()

NameError: name 'gym' is not defined

## Evaluate Model and Display on Video

In [None]:
import gym
import pygame

def eval_model(yo_model):
    env = gym.make('CartPole-v1', render_mode='human')
    state = env.reset()[0]
    timesteps = 0 
    actions = []
    for _ in range(1000):
        env.render()
        state_tensor = torch.from_numpy(state).float().unsqueeze(0)
        action = yo_model(state_tensor).max(1)[1].item()
        actions.append(action)
        timesteps += 1
        #action = modelenv.action_space.sample()  # replace with your action selection method
        state, reward, term,trunc, info = env.step(action)
        done = term or trunc
        if done:
            #state = env.reset()[0]
            print("time steps:", timesteps)
            break
            
    
    env.close()


## Control Cartpole Yourself

In [None]:
env = gym.make("CartPole-v0",render_mode='human')
env.reset()
while True:
    action = int(input("Action: "))
    if action in (0, 1):
        state,reward,term,trunc,info = env.step(action)
        env.render()
        if term or trunc:
            break
         
#env.close()
        

In [None]:
# Initialize pygame and the environment
pygame.init()
env = gym.make('CartPole-v1', render_mode='human')

# Set up the display
screen_width, screen_height = 600, 400
screen = pygame.display.set_mode((screen_width, screen_height))
pygame.display.set_caption("Click Window to Start")

# Start the environment
observation = env.reset()
done = False
clock = pygame.time.Clock()
steps = 0

waiting_for_click = True
while waiting_for_click:
    for event in pygame.event.get():
        if event.type == pygame.MOUSEBUTTONDOWN:  # Wait for click to start
            waiting_for_click = False
        elif event.type == pygame.QUIT:
            pygame.quit()
            exit()  # Exit the entire script if the window is closed


while not done or done:
    # Render the environment to the pygame window
    env.render()

    # Check for key presses to control the cart
    #action = 1  # Default action (don't move)
    for event in pygame.event.get():
        if event.type == pygame.KEYDOWN:
            if event.key == pygame.K_LEFT:
                action = 0  # Move cart to the left
            elif event.key == pygame.K_RIGHT:
                action = 1  # Move cart to the right
        elif event.type == pygame.QUIT:
            done = True

    # Step the environment with the chosen action
    observation, reward, term,trunc, info = env.step(action)
    steps += 1
    done = term or trunc

    # Update the display and wait a short duration
    pygame.display.flip()
    clock.tick(5)  # Limit to 60 frames per second

print("your timesteps:", steps)

#env.close()
#pygame.quit()

## Implement a Target Network

In [None]:
# Initialize environment, model, and optimizer
env = gym.make('CartPole-v1')
model = DQN()
target_model = DQN()
target_model.load_state_dict(model.state_dict())
target_model.eval()  # Set the target network to evaluation mode

learning_rate = 0.001
optimizer = optim.Adam(model.parameters(), lr = learning_rate)
loss_fn = nn.SmoothL1Loss()
#loss_fn = nn.MSELoss()

# Training parameters
num_episodes = 10000
gamma = 0.99  # Discount factor
epsilon_start = 1
epsilon_end = 0.05
epsilon_decay = 0.9995
TARGET_UPDATE = 10

for episode in range(num_episodes):
    state = env.reset()[0]
    epsilon = max(epsilon_end, epsilon_start * (epsilon_decay ** episode))  # Decrease epsilon
    for t in range(1, 10000):  # Limit the number of steps per episode
        state_tensor = torch.from_numpy(state).float().unsqueeze(0)
        
        # Blur the state
        #precision = 0.001
        #state_tensor = torch.round(state_tensor / precision) * precision
        
        # Select and perform an action using epsilon-greedy policy
        if random.random() <= epsilon:
            action = env.action_space.sample()
        else:
            with torch.no_grad():
                action = model(state_tensor).max(1)[1].item()

        # Observe new state and reward
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        
        if done:
            reward = -10  # Penalize if the pole fell over

        # Move to the next state
        next_state_tensor = torch.from_numpy(next_state).float().unsqueeze(0)

        # Compute the expected Q values
        expected_state_action_value = (torch.tensor(reward) + gamma * 
                                       target_model(next_state_tensor).max(1)[0] * (1-done)).unsqueeze(0)

        # Compute the loss
        state_action_value = model(state_tensor)[0, action].unsqueeze(0)
        loss = loss_fn(state_action_value, expected_state_action_value)

        # Optimize the model
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if done:
            break

        state = next_state  # Move to the next state
    
    if episode % TARGET_UPDATE == 0:
        target_model.load_state_dict(model.state_dict())
    
    #if t >= 500:
    #    print("breaking on 500 training")
    #    break

    if episode % 100 == 0:
        print(f"Episode {episode} finished after {t} timesteps")

env.close()

## Now Lets Implement a Replay Buffer

In [3]:
import gymnasium as gym

In [4]:
import random

class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        self.position = 0

    def push(self, state, action, reward, next_state, done):
        """Saves a transition."""
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

    def __len__(self):
        return len(self.buffer)

In [5]:
# Initialize environment, model, and optimizer
env = gym.make('CartPole-v1')
model = DQN()
learning_rate = 0.001
optimizer = optim.Adam(model.parameters(), lr = learning_rate)
loss_fn = nn.SmoothL1Loss()
#loss_fn = nn.MSELoss()

# Training parameters
num_episodes = 5000
gamma = 0.99  # Discount factor
epsilon_start = 1
epsilon_end = 0.05
epsilon_decay = 0.9995

# Replay Buffer
replay_buffer = ReplayBuffer(1000)
batch_size = 50

# Training Data Analysis
episode_lengths = []

for episode in range(num_episodes):
    state = env.reset()[0]
    epsilon = max(epsilon_end, epsilon_start * (epsilon_decay ** episode))  # Decrease epsilon
    
    for t in range(1, 10000):  # Limit the number of steps per episode
        state_tensor = torch.from_numpy(state).float().unsqueeze(0)
        
        # Select and perform an action using epsilon-greedy policy
        if random.random() <= epsilon:
            action = env.action_space.sample()
        else:
            with torch.no_grad():
                action = model(state_tensor).max(1)[1].item()

        # Observe new state and reward
        next_state, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
        
        if done:
            reward = -1  # Penalize if the pole fell over

        # Move to the next state
        next_state_tensor = torch.from_numpy(next_state).float().unsqueeze(0)

        experience = (state_tensor, action, reward, next_state_tensor, done)
        replay_buffer.push(*experience)

        if done:
            break

        state = next_state  # Move to the next state

        if len(replay_buffer) > batch_size:
            # Random sample from the Replay Buffer
            experience_batch = replay_buffer.sample(batch_size)
            batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(*experience_batch)

            #print(batch_state)
            #print(batch_action)
            #print(batch_reward)
            #print(batch_next_state)
            #print(batch_done)

            # Convert batches to PyTorch tensors
            batch_state = torch.stack(batch_state)
            batch_action = torch.tensor(batch_action, dtype=torch.long)  # Corrected line
            batch_reward = torch.tensor(batch_reward, dtype=torch.float)
            batch_next_state = torch.stack(batch_next_state)
            batch_done = torch.tensor(batch_done, dtype=torch.float)


            # Model prediction of Q values  
            state_q_value = model(batch_state)[:,0]
            state_q_value = torch.stack([state_q_value[idx][action_idx] for idx,action_idx in enumerate(batch_action)])
            state_q_value = state_q_value.unsqueeze(1)
            
            # Compute the expected Q values (Bellman Equation)
            expected_q_value_batch = (batch_reward + 
                                      gamma * model(batch_next_state).max(2)[0].squeeze() * (1-batch_done)).unsqueeze(1)
            

            loss = loss_fn(state_q_value, expected_q_value_batch)
    
            # Optimize the model
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()


    if episode % 100 == 0:
        print(f"Episode {episode} finished after {t} timesteps")

env.close()

Episode 0 finished after 23 timesteps


KeyboardInterrupt: 

In [6]:
state_q_value

tensor([[ 0.8025],
        [ 3.6528],
        [ 0.5515],
        [ 2.9403],
        [ 4.3641],
        [ 4.7606],
        [ 0.3282],
        [ 3.5947],
        [ 3.6855],
        [ 3.8108],
        [-0.5787],
        [ 3.3134],
        [ 1.2739],
        [ 4.4049],
        [ 1.9770],
        [ 1.4646],
        [ 2.0570],
        [ 2.0744],
        [ 4.1432],
        [ 4.5526],
        [ 3.6341],
        [ 3.3051],
        [ 0.1354],
        [ 2.1898],
        [-0.6440],
        [ 3.2352],
        [ 2.5514],
        [ 3.1803],
        [ 1.8424],
        [ 4.2790],
        [ 4.8002],
        [ 4.8734],
        [ 2.6041],
        [ 4.4981],
        [ 0.0271],
        [ 3.6024],
        [-0.7242],
        [-0.6985],
        [ 2.6550],
        [ 1.6143],
        [-0.1972],
        [ 0.4030],
        [ 4.3996],
        [ 3.6417],
        [ 3.0449],
        [ 4.7365],
        [ 3.9903],
        [ 0.2311],
        [ 4.9575],
        [ 3.3663]], grad_fn=<UnsqueezeBackward0>)