In [None]:
import gym
gym.__version__
# 0.26.2

In [None]:
# TEST gym environment
import gym
import numpy as np
import time

#env = gym.make('LunarLander')
env = gym.make('CartPole-v1', render_mode="human")
#env = gym.make('FrozenLake-v1')
#env = gym.make('FrozenLake-v1', render_mode="human")

state = env.reset(seed=42)
action_size = env.action_space.n
print('Number of actions:', action_size)
done = False
n_episodes = 1
for _ in range(n_episodes):
    while not done:
        action = np.random.randint(0, action_size)
        state, reward, done, info, _ = env.step(action)
        print('Action:', action)
        print('State:', state)
        print('Reward:', reward)
        env.render()
        time.sleep(0.1)
    done = False
    env.reset()
env.close()

In [None]:
# Imports
import random
import torch
import numpy as np
import torch.nn as nn
import matplotlib.pyplot as plt

In [None]:
# Replay Buffer Class
class ReplayBuffer:
    def __init__(self, buffer_size):
        # Initialize the replay buffer with a given maximum size.
        self.buffer_size = buffer_size
        # Create an empty buffer list to store experiences.
        self.buffer = []
        # Initialize the starting position of the buffer.
        self.position = 0

    def add(self, state, action, reward, state_, done):
        # Add a new experience to the buffer.
        experience = (state, action, reward, state_, done)
        # Add a None element to the buffer list if it's not full yet.
        if len(self.buffer) < self.buffer_size:
            self.buffer.append(None)
        # Add the experience to the buffer at the current position.
        self.buffer[self.position] = experience
        # Increment the position and wrap around if it goes beyond the buffer size.
        self.position = (self.position + 1) % self.buffer_size

    def sample(self, batch_size):
        # Sample a random batch of experiences from the buffer.
        batch = random.sample(self.buffer, min(batch_size, len(self.buffer)))
        # Unzip the batch of experiences into separate lists of states, actions, rewards, and next states.
        states, actions, rewards, next_states, dones = zip(*batch)
        # Convert to a PyTorch tensor with various data type.
        return (
            torch.tensor(states, dtype=torch.float32),
            torch.tensor(actions, dtype=torch.int64).reshape(-1,1),
            torch.tensor(rewards, dtype=torch.float32),
            torch.tensor(next_states, dtype=torch.float32),
            torch.tensor(dones, dtype=torch.int64),
        )

    def __len__(self):
        # Return the current size of the buffer.
        return len(self.buffer)


In [None]:
# Neural Network and Agent Classes

# Define a neural network class that inherits from the PyTorch nn.Module class.
class LinearDeepQNetwork(nn.Module):
    def __init__(self, lr, n_actions, input_dims):
        super(LinearDeepQNetwork, self).__init__()

        # Define the neural network layers and activation functions.
        self.fc1 = nn.Linear(input_dims[0], 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, n_actions)

        # Define the optimizer and loss function for training the neural network.
        self.optimizer = torch.optim.Adam(self.parameters(), lr=lr)
        self.loss = nn.MSELoss()
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.to(self.device)

    # Define the forward pass of the neural network.
    def forward(self, state):
        layer1 = torch.relu(self.fc1(state))
        layer2 = torch.relu(self.fc2(layer1))
        actions = self.fc3(layer2)

        return actions

# Define an agent class for training the neural network.
class Agent():
    def __init__(self, input_dims, n_actions, lr=2e-4, gamma=0.95,
                epsilon=1.0, eps_dec=1e-4, eps_min=0.05):
        self.lr = lr
        self.input_dims = input_dims
        self.n_actions = n_actions
        self.gamma = gamma
        self.epsilon = epsilon
        self.eps_dec = eps_dec
        self.eps_min = eps_min
        self.action_space = [i for i in range(self.n_actions)]

        # Create instances of the neural networks for the agent.
        self.Q_network = LinearDeepQNetwork(self.lr, self.n_actions, self.input_dims)
        self.Target_network = LinearDeepQNetwork(self.lr, self.n_actions, self.input_dims)

    # Update the target network initially to match the online network.
    def update_target_network(self):
        self.Target_network.load_state_dict(self.Q_network.state_dict())


    # Define a function for choosing an action given an observation.
    def choose_action(self, state):
        if np.random.random() > self.epsilon:
            # Use the neural network to predict the Q-values for the current state.
            state = torch.tensor(state, dtype=torch.float).to(self.Q_network.device)
            actions = self.Q_network.forward(state)
            # Choose the action with the highest Q-value.
            action = torch.argmax(actions).item()
        else:
            # Choose a random action with probability epsilon.
            action = np.random.choice(self.action_space)

        return action

    # Define a function for decrementing epsilon over time to decrease exploration.
    def decrement_epsilon(self):
        self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min else self.eps_min

    # Define a function for training the neural network with a batch of experiences.
    def learn(self, states, actions, rewards, states_, dones):
        self.Q_network.optimizer.zero_grad()
        # Convert the data to PyTorch tensors and move to the device for training.
        states = torch.tensor(states).to(self.Q_network.device)
        actions = torch.tensor(actions).to(self.Q_network.device)
        rewards = torch.tensor(rewards).to(self.Q_network.device)
        states_ = torch.tensor(states_).to(self.Target_network.device)
        dones = torch.tensor(dones).to(self.Target_network.device)

        # Use the online network to predict the Q-values for the current states and select actions.
        q_pred = self.Q_network.forward(states).gather(1, actions)
        
        # Use the target network to predict the Q-values for the next states.
        q_next, _ = torch.max(self.Target_network.forward(states_), dim=1)

        # Calculate the target Q-values based on the current rewards and expected future rewards.
        q_target = rewards + self.gamma * (1 - dones) * q_next

        # Calculate the mean squared error loss between the predicted and target Q-values.
        loss = self.Q_network.loss(q_target, q_pred.squeeze()).to(self.Q_network.device)
        # Perform backpropagation to update the online network weights.
        loss.backward()
        self.Q_network.optimizer.step()
        # Decrease the epsilon value to decrease exploration over time.
        self.decrement_epsilon()

    def save(self, path):
        torch.save(self.Q_network.state_dict(), path)

    def load(self, path):
        self.Q_network.load_state_dict(torch.load(path))



In [None]:
# plot score and epsilon curves
def plot_score_epsilon(x_axis, scores, epsilons):
    fig, ax1 = plt.subplots()

    # Plot the first curve using the first y-axis.
    ax1.plot(x_axis, scores, 'b-', label='score')
    ax1.set_xlabel('x')
    ax1.set_ylabel('score', color='b')

    # Create a second y-axis object and plot the second curve using it.
    ax2 = ax1.twinx()
    ax2.plot(x_axis, epsilons, 'r-', label='epsilon')
    ax2.set_ylabel('epsilon', color='r')

    # Add a legend to the plot.
    lines, labels = ax1.get_legend_handles_labels()
    lines2, labels2 = ax2.get_legend_handles_labels()
    ax2.legend(lines + lines2, labels + labels2, loc='lower left')

    # Show the plot.
    plt.show()

In [None]:
# Create the environment
env_name = 'CartPole-v1'
env = gym.make(env_name)

# Get the dimension of the state and action spaces
state_dim = env.reset()[0].shape
action_dim = env.action_space.n
print(f'state dim {state_dim}')
print(f'action dim {action_dim}')

# Define the path to save the trained model
save_path = 'models/'+env_name

# Define the number of episodes to run
n_episodes = 300
# Frequency at which to save the scores
save_score_freq = 5

# Number of episodes after which the target network is updated
target_update_freq = 5

# Initialize the replay buffer and define batch size
buffer_size = 1000
batch_size = 64
replay_buffer = ReplayBuffer(buffer_size)

# Create an instance of the agent class
agent = Agent(input_dims=state_dim, n_actions=action_dim)

# Initialize tracking variables
x_axis, scores, epsilons = [], [], []
# Run the main training loop for the specified number of episodes
for i in range(n_episodes):
    # Reset the environment and initialize the score, done flag, and observation
    score = 0
    done = False
    state = env.reset()[0]
    steps = 0

    # Run the episode until the environment returns done
    while not done:
        # Choose an action based on the current observation and agent policy
        action = agent.choose_action(state)

        # Take a step in the environment and update the score, done flag, and observation
        state_, reward, done, info, _ = env.step(action)
        replay_buffer.add(state, action, reward, state_, done)  # Add the experience to the replay buffer

        # Perform learning if enough experiences are stored in the replay buffer
        if len(replay_buffer) > batch_size:
            states, actions, rewards, states_, dones = replay_buffer.sample(batch_size)  # Sample a batch of experiences
            agent.learn(states, actions, rewards, states_, dones)  # Train the agent with the sampled experiences

        state = state_
        score += reward
        steps += 1

    if i % target_update_freq == 0:
        agent.update_target_network()  # Update the target network weights periodically

    # Print the episode statistics and track the score and epsilon value over time
    if (i + 1) % save_score_freq == 0:
        avg_score = np.mean(scores[-save_score_freq:])
        print(f'episode {i+1}, last score {score:.1f}, avg score {avg_score:.1f} epsilon {agent.epsilon:.2f}')
        x_axis.append(i+1)
        scores.append(score)
        epsilons.append(agent.epsilon)

# Save the trained model
agent.save(save_path)  

# Plot the scores and epsilon values over time
plot_score_epsilon(x_axis, scores, epsilons)


In [None]:
env_name = 'CartPole-v1'
load_path = 'models/' + env_name

# Create an instance of the environment with rendering enabled.
env = gym.make(env_name, render_mode="human")

# Initialize and load the agent.
agent = Agent(input_dims=env.reset()[0].shape, n_actions=env.action_space.n)
agent.load(load_path)

# Set the exploration rate to zero to force the agent to choose actions based on its learned policy.
agent.epsilon = 0.0

# Reset the environment and obtain the initial state.
state = env.reset()[0]

# Run a single episode of the environment using the agent's learned policy.
done = False
n_episodes = 1
for _ in range(n_episodes):
    while not done:
        # Print the current observation and chosen action.
        print('---')
        print('State:', state)
        action = agent.choose_action(state)
        print('Action:', action)

        # Take a step in the environment and render the current state.
        state, reward, done, info, _ = env.step(action)
        env.render()
        #time.sleep(0.1)  # Optional delay for visualization purposes.

    # Reset the environment and the done flag for the next episode.
    done = False
    env.reset(seed=42)

# Close the environment viewer.
env.close()
