# Import/Device

In [1]:
import gymnasium as gym

import numpy as np

import random
from collections import deque
import math
from IPython.display import display, clear_output

import torch
import torch.nn as nn
import torch.optim as optim

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
torch.cuda.is_available()
torch.cuda.device_count()
torch.cuda.current_device()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.cuda.get_device_name(0)

'NVIDIA GeForce GTX 1660 SUPER'

# Gym Tests

In [None]:
#looking at all the envs available
def list_gym_environments():
    print("num envs: " + str(len(gym.envs.registry)))
    for k, v in gym.envs.registry.items():
        print(k)

list_gym_environments()

In [None]:
env = gym.make("LunarLander-v2", continuous=False, render_mode="human")
observation, info = env.reset(seed=42)
for _ in range(1000):
   #print(env.action_space.sample())
   action = env.action_space.sample()  # this is where you would insert your policy
   observation, reward, terminated, truncated, info = env.step(action)
   #print(observation)

   if terminated or truncated:
      observation, info = env.reset()
      break
env.close()

# Building a DQN (failed)

In [None]:
class ReplayBuffer():
    def __init__(self, batch_size):
        self.buffer = deque(maxlen=1000)
        self.batch_size = batch_size
    
    def add_experience(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample_batch(self):
        return random.choices(self.buffer, k=self.batch_size)


In [None]:
class QNetwork(nn.Module):
    def __init__(self, in_dim, h1_dim, h2_dim, out_dim):
        super().__init__()
        self.linear1 = nn.Linear(in_dim, h1_dim)
        self.linear2 = nn.Linear(h1_dim, h2_dim)
        self.linear3 = nn.Linear(h2_dim, out_dim)
    
    def forward(self, x):
        x = torch.relu(self.linear1(x))
        x = torch.relu(self.linear2(x))
        x = self.linear3(x)
        return x


In [None]:
class Agent():
    def __init__(self):
        self.replayBuffer = ReplayBuffer(50000)

        self.qNetwork = QNetwork(8, 16, 16, 4).to(device)
        self.targetNetwork = QNetwork(8, 16, 16, 4).to(device)
        self.targetNetwork.eval()
        self.updateTargetNet()

        self.training = True
        self.epsilon = 1
        self.gamma = 0.95
        self.loss_fn = nn.MSELoss()
        self.optimizer = optim.Adam(self.qNetwork.parameters(), lr=0.00025)
    
    def updateTargetNet(self):
        self.targetNetwork.load_state_dict(self.qNetwork.state_dict())
    
    def set_training(self, train):
        self.training = train
    
    def get_action(self, state, action_space):
        if (self.training and np.random.random() < self.epsilon):
            action = action_space.sample()
        else:
            with torch.no_grad():
                action = self.qNetwork(torch.tensor(np.array(state)).to(device)).max(0, keepdim=True)[1].item()
        return action
    
    def record_experience(self, state, action, reward, next_state, done):
        self.replayBuffer.add_experience(state, action, reward, next_state, done)
    
    def train(self, iteration):
        # get the batch
        batch = self.replayBuffer.sample_batch()
        states, actions, rewards, next_states, done = zip(*batch)
        states = torch.tensor(np.array(states), dtype=torch.float32).to(device)
        actions = torch.tensor(np.array(actions), dtype=torch.int64).unsqueeze(-1).to(device)
        rewards = torch.tensor(np.array(rewards), dtype=torch.float32).unsqueeze(-1).to(device)
        next_states = torch.tensor(np.array(next_states), dtype=torch.float32).to(device)
        done = torch.tensor(np.array(done), dtype=torch.float32).unsqueeze(-1).to(device)
        
        #Calculate the targets using: r + γ * max_a' Q(s', a'; θ^-)
        next_q_values = self.targetNetwork(next_states).max(1, keepdim=True)[0].detach()
        targets = rewards + (1 - done) * self.gamma * next_q_values

        #Calculate the loss with: L(θ) = E[(Q(s, a; θ) - (r + γ * max_a' Q(s', a'; θ^-))^2]
        loss = self.loss_fn(self.qNetwork(states), targets)

        #optmize:
        self.optimizer.zero_grad()
        loss.backward()

        #update target network periodically
        if (iteration % 1000 == 0):
            self.updateTargetNet()
        
        #ε = ε_min + (ε_max - ε_min) * exp(-decay_rate * step)
        epsilon_min = 0.01
        self.epsilon = epsilon_min + (1 - epsilon_min) * math.exp(-0.001 * iteration)
    
    def save(self):
        torch.save(self.qNetwork.state_dict(), "2023-04-08 model.pth")
    
    def load(self):
        self.qNetwork.load_state_dict(torch.load("2023-04-08 model.pth"))


# Training a DQN (failed)

In [None]:
agent = Agent()

In [None]:
env = gym.make("LunarLander-v2", continuous=False)

state, info = env.reset(seed=42)
cur_reward = 0
rewards = []
for i in range(10000):
   action = agent.get_action(state, env.action_space)
   next_state, reward, terminated, truncated, info = env.step(action)
   agent.record_experience(state, action, reward, next_state, terminated or truncated)
   cur_reward += reward

   state = next_state

   agent.train(i)

   if terminated or truncated:
      print(i, cur_reward)
      rewards.append(cur_reward)
      cur_reward = 0
      state, info = env.reset()
      #break
env.close()

In [None]:
import matplotlib.pyplot as plt
plt.plot(rewards)

# Simple Q-Learning

In [None]:
env = gym.make("FrozenLake-v1", render_mode="human")
observation, info = env.reset(seed=42)
for _ in range(10):
   #print(env.action_space.sample())
   action = env.action_space.sample()  # int [0, 3]
   observation, reward, terminated, truncated, info = env.step(action)
   print(observation, reward) # observation: int [0, 15] (player's position on the 4x4)

   if terminated or truncated:
      observation, info = env.reset()
      #break
env.close()

In [None]:
# Q Learning
class ActionValueFunction():
    def __init__(self, s, a):
        self.sa_pairs = np.zeros((s, a))
        #self.sa_pairs = np.full((s, a), 100)
    
    def get_action_value(self, s, a):
        return self.sa_pairs[s][a]
    
    def set_action_value(self, s, a, v):
        self.sa_pairs[s][a] = v
    
    def get_best_action(self, s):
        #returns the index of the action with the highest action-value, given the state.
        return np.argmax(self.sa_pairs[s])
    
    def get_best_action_value(self, s):
        #returns the value of the action with the highest action-value, given the state.
        return np.max(self.sa_pairs[s])
    
    def load(self, sa):
        self.sa_pairs = sa

class Agent():
    def __init__(self):
        self.q = ActionValueFunction(16, 4)
        self.epsilon = 1
        self.alpha = 1 #learning rate
        self.gamma = 0.9

    def get_action(self, state):
        if (np.random.random() < self.epsilon):
            action = random.randint(0, 3)
        else:
            action = self.q.get_best_action(state)
            #print("The action-values for this state: ", self.q.sa_pairs[state], state)
            #print("I choose the action: ", action)
        return action
    
    def decay_epsilon(self):
        self.epsilon = max(0.1, self.epsilon * 0.99) # Decay factor can be adjusted

    def explore(self):
        self.epsilon = 1

    def exploit(self):
        self.epsilon = 0

    def update(self, state, action, reward, next_state):
        current_action_value = self.q.get_action_value(state, action)
        max_next_action_value = self.q.get_best_action_value(next_state)
        updated_action_value = current_action_value + self.alpha * ((reward + (self.gamma * max_next_action_value)) - current_action_value)
        self.q.set_action_value(state, action, updated_action_value)
        #print(current_action_value, updated_action_value)
    
    def print(self):
        print(self.q.sa_pairs)


In [None]:
# Initialize the action-value function Q(s, a) for all state-action pairs to zero or random small numbers
agent = Agent()
agent.explore()

In [None]:
env = gym.make("FrozenLake-v1", is_slippery=False, render_mode=None)
state, info = env.reset(seed=42)

#agent.exploit()

for i in range(10000):
    # Choose an action (a) for the current state (s) using an exploration strategy (e.g., ε-greedy).
    action = agent.get_action(state)

    # Take action (a), observe the next state (s') and reward (r).
    next_state, reward, terminated, truncated, info = env.step(action)

    #Update the action-value function Q(s,a) using the observed transition and the Bellman optimality equation
    #Q(s,a) <- Q(s,a) + α * (r + γ * max_a'[Q(s',a')] - Q(s,a)) where α is the learning rate (0 < α ≤ 1).
    agent.update(state, action, reward, next_state)

    # Set the current state (s) to the next state (s') and repeat steps b to d until a stopping condition is met (e.g., a certain number of episodes or a convergence criterion).
    state = next_state

    #clear_output(wait=True)
    #print(i)
    #print(agent.q.sa_pairs)
    
    if terminated or truncated:
        state, info = env.reset()
        reward = 0
        #agent.decay_epsilon()
        #break

env.close()


In [None]:
saved_sa = agent.q.sa_pairs
print(saved_sa)
#np.savetxt('./weights/2023-04-09 01 solved_frozenlake_42.txt', saved_sa, delimiter=',')

In [None]:
loaded_array = np.loadtxt('./weights/2023-04-09 01 solved_frozenlake_42.txt', delimiter=',')
print(loaded_array)
agent.q.load(loaded_array)

In [None]:
# Reshape the data into a 4x4 grid
grid = saved_sa.reshape(4, 4, 4)

# Display each 4-dimensional array as a square in a 4x4 grid
for i in range(4):
    for j in range(4):
        print(grid[i][j], end=" | ")
    print("\n" + "-"*100)

In [None]:
layout = env.desc

# Print the layout
print(layout)

# Building a DQN Part 2

In [68]:
env = gym.make("CartPole-v1", render_mode="human")
state, info = env.reset(seed=42)
for _ in range(50):
   #print(env.action_space.sample()) # int [0, 1]
   action = env.action_space.sample()
   new_state, reward, terminated, truncated, info = env.step(action)
   #print(new_state, reward) # 4 floats, int

   state = new_state

   if terminated or truncated:
      state, info = env.reset()
      reward = 0
env.close()

In [None]:
class QNetwork(nn.Module):
    def __init__(self, in_dim=4, h_dim=10, out_dim=2):
        super().__init__()
        self.fc1 = nn.Linear(in_dim, h_dim)
        self.fc2 = nn.Linear(h_dim, h_dim)
        self.fc3 = nn.Linear(h_dim, out_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayBuffer():
    def __init__(self, max_len, batch_size):
        self.buffer = deque(maxlen=max_len)
        self.batch_size = batch_size
    
    def add_experience(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample_batch(self):
        return random.sample(self.buffer, k=self.batch_size)

class Agent():
    def __init__(self):
        self.replayBuffer = ReplayBuffer(50000, 64)

        self.qNetwork = QNetwork().to(device)
        self.targetNetwork = QNetwork().to(device)
        self.updateTargetNet()
        self.target_network_update_period = 1000

        self.epsilon = 1
        self.epsilon_decay = False
        self.gamma = 0.95
        self.loss_fn = nn.MSELoss()
        self.optimizer = optim.Adam(self.qNetwork.parameters(), lr=0.00025)
    
    def updateTargetNet(self):
        self.targetNetwork.load_state_dict(self.qNetwork.state_dict())
    
    def get_action(self, state, action_space):
        if (np.random.random() < self.epsilon):
            action = action_space.sample()
        else:
            with torch.no_grad():
                #convert state list to tensor
                state_tensor = torch.tensor(np.array(state), dtype=torch.float32).to(device)
                #get the estimated q-values given the state
                q_values = self.qNetwork(state_tensor)
                #get the action with the highest value (getting the action/index, not the value)
                action = q_values.max(0, keepdim=True)[1].item()
        return action
    
    def record_experience(self, state, action, reward, next_state, done):
        self.replayBuffer.add_experience(state, action, reward, next_state, done)
    
    def train(self, iteration):
        # get the batch
        batch = self.replayBuffer.sample_batch()
        # (maybe I should hide all this in the replay buffer?)
        states, actions, rewards, next_states, done = zip(*batch)
        states = torch.tensor(np.array(states), dtype=torch.float32).to(device)
        actions = torch.tensor(np.array(actions), dtype=torch.int64).unsqueeze(-1).to(device)
        rewards = torch.tensor(np.array(rewards), dtype=torch.int64).unsqueeze(-1).to(device)
        next_states = torch.tensor(np.array(next_states), dtype=torch.float32).to(device)
        done = torch.tensor(np.array(done), dtype=torch.float32).unsqueeze(-1).to(device) # False = 0, True = 1
        
        #Calculate the targets using: r + γ * max_a' Q(s', a'; θ^-)
        next_q_values = self.targetNetwork(next_states).max(1, keepdim=True)[0].detach() #detach so the backprop doesn't go into the target network
        targets = rewards + (1 - done) * self.gamma * next_q_values

        #Calculate the loss with: L(θ) = E[(Q(s, a; θ) - (r + γ * max_a' Q(s', a'; θ^-))^2]
        current_q_values = self.qNetwork(states).max(1, keepdim=True)[0] # had to fix this (maybe I should hide this line in a new "Q-Value" class?)
        loss = self.loss_fn(current_q_values, targets)

        #optmize:
        self.optimizer.zero_grad()
        loss.backward()

        #update target network periodically
        if (iteration % self.target_network_update_period == 0):
            self.updateTargetNet()
        
        #ε = ε_min + (ε_max - ε_min) * exp(-decay_rate * step)
        if (self.epsilon_decay):
            epsilon_min = 0.01
            self.epsilon = epsilon_min + (1 - epsilon_min) * math.exp(-0.001 * iteration)
    
    def save(self):
        torch.save(self.qNetwork.state_dict(), "2023-04-09 model.pth")
    
    def load(self):
        self.qNetwork.load_state_dict(torch.load("2023-04-09 model.pth"))

In [None]:
#Things to monitor during training: loss function - does it go down?. estimated Q-values in different areas of the observation space - are they moving as expected?. Agent performance/how long it lasts in the arena (during not-training)
#Idea: random actions for a while, without training (the replay buffer needs to stock up). Then training with an epsilon decay (random at first, starts to further explore the better areas). Terminate on a condition (q-value estimation stabalizes? num times? agent performance perfect?). Measure performance in full exploitation mode.

