In [1]:
import os
import datetime
import numpy as np
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # Avoid TF Debug Warnings

In [2]:
import gym

In [3]:
from gym import Env

In [4]:
ENV = gym.make("FrozenLake-v1", is_slippery=False)

In [67]:
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras import callbacks
from tensorflow import one_hot

In [6]:
from typing import Union, List
import random

In [68]:
def build_model(input_size: int, hidden_size: int, output_size: int, optimizer: Union[Adam, SGD]) -> Model:
    
    model = Sequential()
    model.add(Input(shape=input_size, name="input"))
    model.add(Dense(hidden_size, activation='relu',name="hidden_1"))
    model.add(Dense(hidden_size, activation='relu',name="hidden_2"))
    model.add(Dense(output_size, activation="linear", name="output"))
    model.compile(loss=MeanSquaredError(), optimizer=optimizer, metrics=['acc'])
    return model

In [69]:
from dataclasses import dataclass

@dataclass
class Memory:
    prev_obs: int
    action: int
    actual_obs: int
    reward: float
    done: bool

In [70]:
from collections import deque

class Agent:
    
    def __init__(self, environment: Env):
        
        self.env = environment
        action_space = self.env.action_space.n
        self.observation_space = self.env.observation_space.n
        self.epsilon = 0.9
        self.epsilon_min = 0.1
        self.gamma = 0.9
        optimizer = Adam()
        self.q_learning_nn = build_model(input_size=self.observation_space, hidden_size=64, output_size=action_space, optimizer=optimizer)
        self.target_nn = build_model(input_size=self.observation_space, hidden_size=64, output_size=action_space, optimizer=optimizer)
        self.transfer_learning()
        self.memory = deque(maxlen=2000)
    
    def transfer_learning(self) -> None:
        
        self.target_nn.set_weights(self.q_learning_nn.get_weights())
    
    def _get_random_action(self) -> int:
        
        return self.env.action_space.sample()
    
    def _get_best_action(self, observation: np.ndarray) -> int:
        observation = self.one_hot_obs_encoding(observation)
        q_values = self.q_learning_nn.predict(observation)
        return np.argmax(q_values)
    
    def get_action(self, obs: int) -> int:
        
        if self.epsilon > random.random():
            action = self._get_random_action()
        else:
            action = self._get_best_action(obs)
            
        return action
    
    def step(self, action):
        
        return self.env.step(action=action)
    
    def memorize(self, memory: Memory) -> None:
        
        self.memory.append(memory)
        
    def one_hot_obs_encoding(self, obs: int):
        
        return one_hot([obs], self.observation_space)
    
        
    def get_batch_sample(self, batch_size:int) -> List[Memory]:
        
        return np.random.choice(self.memory, size=batch_size)
    
    def learn(self, batch: List[Memory]) -> None:
        
        for mem in batch:
            
            prev_obs = self.one_hot_obs_encoding(mem.prev_obs)
            target = self.q_learning_nn.predict(prev_obs)

            if mem.done:
                action_target = mem.reward
            else:
                actual_obs = self.one_hot_obs_encoding(mem.actual_obs)
                Q_actions = self.target_nn.predict(actual_obs)  # Array with the value of every action
                best_Q = np.max(Q_actions)  # highest Q (value of action)
                action_target = mem.reward + self.gamma * best_Q

            target[0][action] = action_target
            self.q_learning_nn.fit(prev_obs, target, epochs=1, verbose=0)
    


In [71]:
EPOCHS = 10
EPISODES = 30
BATCH_SIZE = 128
EPSILON 

In [72]:
ENV.observation_space.shape

()

In [73]:
a = Agent(ENV)

In [74]:

for epoch in range(EPOCHS):
    print(f"Epoch: {epoch}")
    
    # Reducción de epsilon tras cada época
    if EPSILON > EPSILON_MIN:
        EPSILON -= (EPSILON*EPSILON_DECAY)
    
    
    for episode in range(EPISODES):
        
        obs = a.env.reset()
        done = False
        while not done:
            action = a.get_action(obs)
            next_obs, reward, done, _ = a.step(action)
            memory = Memory(prev_obs=obs, action=action, reward=reward, actual_obs=next_obs, done=done)
            a.memorize(memory)
            obs = next_obs

    if len(a.memory) > BATCH_SIZE:

        memories_batch = a.get_batch_sample(BATCH_SIZE)
        a.learn(memories_batch)
    
    a.transfer_learning()
    
    if epoch % 5 == 0:
        print("testing...")
        winning_eps = 0
        for i in range(20):
            obs = a.env.reset()
            done = False

            while not done:
                action = a._get_best_action(obs)
                obs, reward, done, _ = a.step(action)

            if reward > 0:
                winning_eps += 1
        print(f"Episodios ejecutados con éxito: {winning_eps}/{20}")


Epoch: 0
testing...
Episodios ejecutados con éxito: 0/20
Epoch: 1
Epoch: 2
Epoch: 3
Epoch: 4
Epoch: 5
testing...
Episodios ejecutados con éxito: 0/20
Epoch: 6
Epoch: 7
Epoch: 8
Epoch: 9


In [13]:
cSDFADS

NameError: name 'cSDFADS' is not defined

In [None]:
a._get_best_action(np.array([14]))

In [60]:
for i in range(16):
    act = a.one_hot_obs_encoding(i)
    print(a.q_learning_nn.predict(act))

[[-0.00028756  0.0711957   0.09674872  0.01330076]]
[[-0.00308418  0.17789477  0.02960289  0.0144994 ]]
[[ 0.05342163  0.06541891  0.09196578 -0.04542448]]
[[-0.0414832   0.08320257  0.07246859  0.05209388]]
[[0.06490675 0.09544932 0.08541021 0.02386591]]
[[-0.06740799 -0.02699154 -0.00328358  0.03000134]]
[[-0.0626333   0.0495147   0.03806045  0.05013835]]
[[-0.02179249  0.1723306   0.20717674  0.09099585]]
[[-0.01864198 -0.05439937  0.14734642  0.01183955]]
[[0.02369849 0.03977751 0.05222321 0.15109932]]
[[-0.00705086 -0.0755577  -0.02279143 -0.02170666]]
[[ 0.07660863 -0.04861972  0.06283103  0.07374804]]
[[-0.02295548  0.10677442  0.2486935   0.13104504]]
[[0.02356146 0.0693612  0.07705854 0.16463512]]
[[ 0.02332428  0.1136844   0.22718623 -0.01120121]]
[[-0.01612626  0.11861486  0.18312223  0.11506802]]


### Gather the data

In [None]:
from dataclasses import dataclass

In [None]:
@dataclass
class Memory:
    prev_obs: int
    action: int
    actual_obs: int
    reward: float
    done: bool

In [None]:
import gym

In [None]:
EPISODES = 1000
memory_list = []

In [None]:
env = gym.make("FrozenLake-v1", is_slippery=False)

In [None]:
for ep in range(EPISODES):
    obs = env.reset()
    done = False
    while not done:
        
        # Select Action
        action = env.action_space.sample() # Random action: Exploration

        next_obs, reward, done, _ = env.step(action)
        
        # Save into memory
        mem = Memory(
            prev_obs = obs,
            action = action,
            actual_obs = next_obs,
            reward = reward,
            done = done
        )
        
        memory_list.append(mem)
        
        obs = next_obs

In [None]:
len(memory_list)

### Define the model

In [None]:
model = Sequential()
model.add(Input(shape=1, name="input"))
model.add(Dense(16, activation='relu',name="hidden0"))
model.add(Dense(16, activation='relu',name="hidden1"))
model.add(Dense(16, activation='relu',name="hidden2"))
model.add(Dense(16, activation='relu',name="hidden3"))
model.add(Dense(4, activation="sigmoid", name="output"))

In [None]:
model.compile(loss='mse', optimizer=Adam(lr=0.1))

### Train the model

In [None]:
import numpy as np
import random

In [None]:
batch_size = 64
gamma = 0.9

In [None]:
logdir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
tensorboard_callback = callbacks.TensorBoard(logdir, histogram_freq=1)

In [None]:
prev_obs_arr = np.array([mem.prev_obs for mem in memory_list])
next_obs_arr = np.array([mem.actual_obs for mem in memory_list])
done_arr = np.array([mem.done for mem in memory_list])
reward_arr = np.array([mem.reward for mem in memory_list])

In [None]:
prev_obs_Q = model.predict(prev_obs_arr)
next_obs_Q = model.predict(next_obs_arr)

In [None]:
target = prev_obs_Q

In [None]:
target

In [None]:
memory_sample = random.sample(memory_list, batch_size)


In [None]:
from typing import List

In [None]:
def learn(memory_sample: List[Memory], model: Model):
    for mem in memory_sample:
    
        prev_obs = np.array([mem.prev_obs])
        target = model.predict(prev_obs)

        if mem.done:
            action_target = mem.reward
        else:
            Q_actions = model.predict([mem.actual_obs])  # Array with the value of every action
            best_Q = np.max(Q_actions)  # highest Q (value of action)
            action_target = mem.reward + gamma * best_Q

        target[0][action] = action_target
        model.fit(prev_obs, target, epochs=1, verbose=0)
    
    return model

In [None]:
model

In [None]:
model.predict(np.array([14]))

In [None]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

In [None]:
import tensorboard
%tensorboard --logdir logs

In [None]:
np.max(Q_actions)