In [None]:
!pip install gymnasium

In [None]:
! pip install swig
! pip install "gymnasium[box2d]"

# Examples

In [None]:
gym.pprint_registry()

## DQPN Example

### Attempt 1

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque

import gymnasium as gym

env = gym.make('CartPole-v1')

# Define the DQN model
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Define the DQN Agent
class DQNAgent:
    def __init__(self, input_dim, output_dim, replay_buffer_size=10000, epsilon=1.0, epsilon_min=0.01, epsilon_decay=0.995, gamma=0.99, lr=0.001):
        self.model = DQN(input_dim, output_dim)
        self.target_model = DQN(input_dim, output_dim)
        self.target_model.load_state_dict(self.model.state_dict())  # Synchronize weights initially
        self.target_model.eval()  # Target model in evaluation mode
        
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.loss_fn = nn.MSELoss()
        
        # Replay buffer
        self.replay_buffer = deque(maxlen=replay_buffer_size)
        
        # Exploration parameters
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        
        # Discount factor
        self.gamma = gamma
        
        # Action space
        self.output_dim = output_dim

    def select_action(self, state):
        """Select an action using an epsilon-greedy strategy."""
        if random.random() < self.epsilon:
            return random.randint(0, self.output_dim - 1)  # Explore
        else:
            state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)  # Add batch dimension
            with torch.no_grad():
                q_values = self.model(state)
            return torch.argmax(q_values).item()  # Exploit

    def store_experience(self, state, action, reward, next_state, done):
        """Store experience in the replay buffer."""
        self.replay_buffer.append((state, action, reward, next_state, done))

    def train(self, batch_size):
        """Sample a batch of experiences and train the model."""
        if len(self.replay_buffer) < batch_size:
            return  # Not enough experiences to train
        
        # Sample a batch
        batch = random.sample(self.replay_buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        # print(states)
        states = torch.tensor(np.array(states), dtype=torch.float32)
        actions = torch.tensor(actions, dtype=torch.int64).unsqueeze(1)
        rewards = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1)
        next_states = torch.tensor(np.array(next_states), dtype=torch.float32)
        dones = torch.tensor(dones, dtype=torch.float32).unsqueeze(1)

        # Compute Q-values
        q_values = self.model(states).gather(1, actions)  # Q(s, a)

        # Compute target Q-values
        with torch.no_grad():
            max_next_q_values = self.target_model(next_states).max(dim=1, keepdim=True)[0]
            target_q_values = rewards + (1 - dones) * self.gamma * max_next_q_values

        # Compute loss
        loss = self.loss_fn(q_values, target_q_values)

        # Backpropagation
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_network(self):
        """Update the target network with the weights of the current model."""
        self.target_model.load_state_dict(self.model.state_dict())

    def decay_epsilon(self):
        """Decay epsilon for exploration-exploitation trade-off."""
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

# Example Usage
input_dim   = env.observation_space.shape[0]
output_dim  = env.action_space.n
agent = DQNAgent(input_dim, output_dim)

def train_agent(agent, env, episodes, batch_size=64, update_target_interval=10):
    for episode in range(episodes):
        state = env.reset()[0]  # Reset the environment at the start of each episode
        print("initial state: ", state)
        done = False
        episode_reward = 0

        while not done:
            # Select and perform an action
            action = agent.select_action(state)
            next_state, reward, terminated, truncated, info = env.step(action)
            
            # Combine terminated and truncated to determine if the episode is done
            done = terminated or truncated
            
            # Store the experience in the replay buffer
            agent.store_experience(state, action, reward, next_state, done)

            # Train the agent
            agent.train(batch_size)
            
            # Transition to the next state
            state = next_state
            episode_reward += reward

        # Decay exploration rate
        agent.decay_epsilon()

        # Update target network periodically
        if episode % update_target_interval == 0:
            agent.update_target_network()

        # Logging
        print(f"Episode {episode + 1}/{episodes}, Reward: {episode_reward:.2f}, Epsilon: {agent.epsilon:.4f}")

    print("Training completed!")

# Train the agent
train_agent(agent, env, episodes=500)


In [None]:
def test_agent(agent, env, episodes=10):
    for episode in range(episodes):
        state = env.reset()  # Reset the environment at the start of each episode
        done = False
        episode_reward = 0

        while not done:
            # Select the best action (no exploration, only exploitation)
            action = agent.select_action(state)  
            
            # Take the action and observe the next state and reward
            next_state, reward, terminated, truncated, info = env.step(action)
            
            # Combine terminated and truncated to determine if the episode is done
            done = terminated or truncated
            
            # Accumulate reward for this episode
            episode_reward += reward
            
            # Move to the next state
            state = next_state

        # Logging the result of the test episode
        print(f"Test Episode {episode + 1}/{episodes}, Total Reward: {episode_reward:.2f}")
    
    print("Testing completed!")

# Test the trained agent
test_agent(agent, env, episodes=10)


### Attempt 2

In [9]:

import gymnasium as gym
# Initialise the environment
env = gym.make("LunarLander-v3", render_mode="rgb_array")
count = 0

# Reset the environment to generate the first observation
observation, info = env.reset(seed=42)
for i in range(1000):
    # this is where you would insert your policy
    action = env.action_space.sample()
    # print("action :", action)

    # step (transition) through the environment with the action
    # receiving the next observation, reward and if the episode has terminated or truncated
    observation, reward, terminated, truncated, info = env.step(action)

    # If the episode has ended then we can reset to start a new episode
    if terminated or truncated:
        count += 1
        observation, info = env.reset()
        print("===================")
        print(f"trial: {count} moves = {i}")
        print("-------------------")

env.close()

trial: 1 moves = 132
-------------------
trial: 2 moves = 283
-------------------
trial: 3 moves = 395
-------------------
trial: 4 moves = 495
-------------------
trial: 5 moves = 587
-------------------
trial: 6 moves = 688
-------------------
trial: 7 moves = 770
-------------------
trial: 8 moves = 846
-------------------
trial: 9 moves = 918
-------------------


In [77]:
(env.action_space).n

np.int64(4)

In [82]:
import torch
env.observation_space.high.tolist(), env.observation_space.low.tolist()

([2.5, 2.5, 10.0, 10.0, 6.2831854820251465, 10.0, 1.0, 1.0],
 [-2.5, -2.5, -10.0, -10.0, -6.2831854820251465, -10.0, -0.0, -0.0])

## Torch Playground

In [None]:
model = DQN(2, 10)
output = model(torch.tensor([1.0,2.0]))


criterion = torch.nn.MSELoss()
target = torch.tensor([0.5] * 10)  # Example target tensor
loss = criterion(output, target)
loss.backward()

print(model.parameters)


# Check gradients
for param in model.parameters():
    print(param.grad)  # Gradients of the loss w.r.t. each parameter

In [None]:
import torch
import torch.nn as nn

# Define a simple linear model
model = nn.Linear(2, 1)  # 2 inputs, 1 output
input = torch.tensor([1.0, 2.0])
target = torch.tensor([3.0])  # Scalar target

# Forward pass
output = model(input)

# Compute loss
criterion = nn.MSELoss()
loss = criterion(output, target)

# Backward pass
loss.backward()

# Check gradients
for param in model.parameters():
    print(param.grad)  # Gradients of the loss w.r.t. each parameter


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Example data
x_train = torch.rand((100, 2))  # 100 samples, 2 features each
y_train = torch.rand((100, 1))  # 100 labels

# Create DataLoader
dataset = TensorDataset(x_train, y_train)
dataloader = DataLoader(dataset, batch_size=10, shuffle=True)

# Define a simple model
model = nn.Linear(2, 1)  # Input size = 2, output size = 1
model = DQN(2, 1)

# Define loss function and optimizer
criterion = nn.MSELoss()  # Mean Squared Error Loss
optimizer = optim.SGD(model.parameters(), lr=0.01)  # Stochastic Gradient Descent

# Training loop
epochs = 1000  # Number of epochs
for epoch in range(epochs):
    epoch_loss = 0.0  # To track loss across batches
    
    for batch_idx, (inputs, targets) in enumerate(dataloader):
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs)
        
        # Compute loss
        loss = criterion(outputs, targets)
        
        # Backward pass
        loss.backward()
        
        # Update weights
        optimizer.step()
        
        # Track the loss
        epoch_loss += loss.item()
    
    # Print epoch summary
    if epoch % 50 == 0:
        print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss:.4f}")
