In [None]:
import gymnasium as gym
import numpy as np 
import matplotlib.pyplot as plt
import torch
import numpy as np
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from copy import deepcopy, copy

In [None]:
#env = gym.make("Blackjack-v1", natural = True , sab = False )

In [None]:
print("ACTION SPACE:\n",env.action_space, ": HIT or STICK\n")
print("OBSERVATION SPACE:\n",env.observation_space, ": user_sum, dealer_card, if_usable_ace_of_user")

In [None]:
print("Action space is {} ".format(env.action_space))
print("Observation space is {} ".format(env.observation_space))
print("Reward range is {} ".format(env.reward_range))

In [None]:
# If this cell fails you need to change the runtime of your colab notebook to GPU
# Go to Runtime -> Change Runtime Type and select GPU
assert torch.cuda.is_available(), "GPU is not enabled"

#  Use gpu if available
device = torch.device("cuda" if torch.cuda.is_available() else "tpu")

In [None]:
class DQN(torch.nn.Module):
    
    def __init__(self, env, learning_rate=1e-3, device='cuda'):
        super(DQN, self).__init__()
        self.device = device
        self.n_inputs = env.observation_space.shape[0]
        self.n_outputs = env.action_space.n
        self.actions = np.arange(env.action_space.n)
        self.learning_rate = learning_rate
        
        ### Construction of the neural network
        self.model = torch.nn.Sequential(
            torch.nn.Linear(self.n_inputs, 16, bias=True),
            torch.nn.ReLU(),
            torch.nn.Linear(16, 16, bias=True),
            torch.nn.ReLU(),
            torch.nn.Linear(16, self.n_outputs, bias=True))
        
        self.optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        
        ### Work with CUDA is allowed
        if self.device == 'cuda':
            self.model.cuda()
            
    
    ### e-greedy method
    def get_action(self, state, epsilon=0.05):
        if np.random.random() < epsilon:
            # random action
            action = np.random.choice(self.actions)  
        else:
            # Q-value based action
            qvals = self.get_qvals(state)  
            action= torch.max(qvals, dim=-1)[1].item()
        
        return action
    
    
    def get_qvals(self, state):
        if type(state) is tuple:
            state = np.array([np.ravel(s) for s in state])
        
        state_t = torch.FloatTensor(state).to(device=self.device)
        
        return self.model(state_t)

In [None]:
class experienceReplayBuffer:

    def __init__(self, memory_size=50000, burn_in=10000):
        self.memory_size = memory_size
        self.burn_in = burn_in
        self.buffer = namedtuple('Buffer', field_names=['state', 'action', 'reward', 'done', 'next_state'])
        self.replay_memory = deque(maxlen=memory_size)

        
    ## We create a list of random indexes and package the experiences into Numpy arrays
    # (makes it easier to calculate the loss later)
    def sample_batch(self, batch_size=32):
        samples = np.random.choice(len(self.replay_memory), batch_size, replace=False)
        # Use asterisk operator to unpack deque 
        batch = zip(*[self.replay_memory[i] for i in samples])
        
        return batch

    
    ## New experiences are added 
    def append(self, state, action, reward, done, next_state):
        self.replay_memory.append(self.buffer(state, action, reward, done, next_state))

        
    ## The buffer is filled with random experiences at the beginning of training
    def burn_in_capacity(self):
        return len(self.replay_memory) / self.burn_in

In [None]:
class DQNAgent:
    
    def __init__(self, env, dnnetwork, buffer, epsilon=0.1, eps_decay=0.99, batch_size=32):
        self.env = env
        self.dnnetwork = dnnetwork
        self.target_network = deepcopy(dnnetwork) # red objetivo (copia de la principal)
        self.buffer = buffer
        self.epsilon = epsilon
        self.eps_decay = eps_decay
        self.batch_size = batch_size
        # block of the last X episodes to calculate the average reward 
        self.nblock = 100 
        # average reward used to determine if the agent has learned to play
        self.reward_threshold = self.env.spec.reward_threshold 
        
        self.initialize()
    
    
    def initialize(self):
        self.update_loss = []
        self.training_rewards = []
        self.mean_training_rewards = []
        self.sync_eps = []
        self.total_reward = 0
        self.step_count = 0
        self.state0 = self.env.reset()[0]
        
    
    ## Take new action
    def take_step(self, eps, mode='train'):
        if mode == 'explore': 
            # random action in burn-in and in the exploration phase (epsilon)
            action = self.env.action_space.sample() 
        else:
            # Action based on the Q-value (max Q-value)
            action = self.dnnetwork.get_action(self.state0, eps)
            self.step_count += 1
            
        # Execute action and get reward and new state
        new_state, reward, terminated, truncated, _ = self.env.step(action)
        done = terminated or truncated
        self.total_reward += reward
        # save experience in the buffer
        self.buffer.append(self.state0, action, reward, done, new_state) 
        self.state0 = new_state.copy()
        
        if done:
            self.state0 = self.env.reset()[0]
        
        return done

    
        
    ## Training
    def train(self, gamma=0.99, max_episodes=50000, 
              batch_size=32,
              dnn_update_frequency=4,
              dnn_sync_frequency=2000):
        
        self.gamma = gamma

        # Fill the buffer with N random experiences
        print("Filling replay buffer...")
        while self.buffer.burn_in_capacity() < 1:
            self.take_step(self.epsilon, mode='explore')

              
        episode = 0
        training = True
        print("Training...")
        while training:
            self.state0 = self.env.reset()[0]
            self.total_reward = 0
            gamedone = False
            while gamedone == False:
                # The agent takes an action
                gamedone = self.take_step(self.epsilon, mode='train')
               
                # Upgrade main network
                if self.step_count % dnn_update_frequency == 0:
                    self.update()
                # Synchronize the main network and the target network
                if self.step_count % dnn_sync_frequency == 0:
                    self.target_network.load_state_dict(
                        self.dnnetwork.state_dict())
                    self.sync_eps.append(episode)
                    
                if gamedone:                   
                    episode += 1
                    # Save the rewards
                    self.training_rewards.append(self.total_reward) 
                    self.update_loss = []
                    # Calculate the average reward for the last X episodes
                    mean_rewards = np.mean(self.training_rewards[-self.nblock:])
                    self.mean_training_rewards.append(mean_rewards)

                    print("\rEpisode {:d} Mean Rewards {:.2f} Epsilon {}\t\t".format(episode, mean_rewards, self.epsilon), end="")
                    
                    # Check if there are still episodes left
                    if episode >= max_episodes:
                        training = False
                        print('\nEpisode limit reached.')
                        break
                    
                    # The game ends if the average reward has reached the threshold
                    if mean_rewards >= self.reward_threshold:
                        training = False
                        print('\nEnvironment solved in {} episodes!'.format(episode))
                        break
                    
                    # Update epsilon according to the fixed decay rate
                    self.epsilon = max(self.epsilon * self.eps_decay, 0.01)
                    
                
    ## Loss calculation           
    def calculate_loss(self, batch):
        # Separate the variables of the experience and convert them to tensors
        states, actions, rewards, dones, next_states = [i for i in batch] 
        rewards_vals = torch.FloatTensor(rewards).to(device=self.dnnetwork.device) 
        actions_vals = torch.LongTensor(np.array(actions)).reshape(-1,1).to(device=self.dnnetwork.device)
        dones_t = torch.BoolTensor(dones).to(device=self.dnnetwork.device)
        
        # Obtain the Q values of the main network
        qvals = torch.gather(self.dnnetwork.get_qvals(states), 1, actions_vals)
        
        # Obtain the target Q values.
        # The detach() parameter prevents these values from updating the target network
        qvals_next = torch.max(self.target_network.get_qvals(next_states), dim=-1)[0].detach()
        # 0 in terminal states
        qvals_next[dones_t] = 0 
        
        # Calculate the Bellman equation
        expected_qvals = self.gamma * qvals_next + rewards_vals
        
        # Calculate the loss
        loss = torch.nn.MSELoss()(qvals, expected_qvals.reshape(-1,1))
        return loss
    

    def update(self):
        # Remove any gradient
        self.dnnetwork.optimizer.zero_grad()  
        # Select a subset from the buffer
        batch = self.buffer.sample_batch(batch_size=self.batch_size) 
        # Calculate the loss
        loss = self.calculate_loss(batch) 
        # Difference to get the gradients
        loss.backward() 
        # Apply the gradients to the neural network
        self.dnnetwork.optimizer.step() 
        # Save loss values
        if self.dnnetwork.device == 'cuda':
            self.update_loss.append(loss.detach().cpu().numpy())
        else:
            self.update_loss.append(loss.detach().numpy())