In [1]:
import gymnasium
import soulsgym
import numpy as np
import pandas as pd
from collections import deque
import statistics
import os
import random
from typing import Any, Callable, List, NamedTuple, Sequence, SupportsFloat, Union
import copy
import time
# Desactivamos los warnings de tensorflow, que son un poco cargantes
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 

import matplotlib.pyplot as plt
import tensorflow as tf
import mouse
import pyautogui
import pydirectinput




In [2]:
BATCH_SIZE = 256

class Transition(NamedTuple):
    """Representa la transición de un estado al siguiente"""
    prev_state: Any       # Estado origen de la transición
    next_state: Any       # Estado destino de la transición
    action: Any           # Acción que provocó esta transición
    reward: SupportsFloat # Recompensa obtenida
    terminated: bool      # Si se ha llegado a un estado terminal

In [3]:
class Memory:
    """Representa la memoria de un agente.

    Concretamente, almacenará las últimas n transiciones realizadas en
    el entorno. El tamaño de la memoria se establecerá en el momento de
    crear la memoria del mismo.

    La memoria guarda las transiciones de manera ordenada, y se podrá
    acceder a ellos por índice, de manera que el recuerdo más lejano
    estará en la posición 0 y el más reciente en la posición -1.
    """

    def __init__(self, size: int):
        """Inicializa el objeto.

        :param size: El tamaño máximo de la memoria del agente."""
        self.max_size = int(size)
        self.transitions: deque = deque(maxlen=self.max_size)

    def remember(self, transition: Transition):
        """Añade un nuevo recuerdo a la memoria del agente.

        :param transition: La transición a recordar."""
        self.transitions.append(transition)

    def batch(self, n: int) -> List[Transition]:
        """Devuelve n recuerdos aleatorios de la memoria.

        :param n: El número de recuerdos aleatorios a devolver. Si es
            superior al número de recuerdos totales devolverá todos los
            recuerdos almacenados.
        :returns: La lista de transiciones.
        """
        n = min(len(self.transitions), n)
        return random.sample(self.transitions, n)

    def __len__(self) -> int:
        """El número de recuerdos que contiene esta memoria.

        :returns: Un entero mayor o igual a 0.
        """
        return len(self.transitions)

    def __getitem__(
            self,
            key: Union[int, slice]
    ) -> Union[Transition, Sequence[Transition]]:
        """Devuelve el/los elemento/s especificados.

        :param key: El argumento que indica los elementos. Puede ser un
            entero normal o un slice.
        :returns: El/los elemento/s especificados por el índice.
        """
        return self.transitions.__getitem__(key)

In [4]:
class Agent:

    def __init__(
        self, *,
        env: gymnasium.Env,
        model: Union[Callable[[int, int], tf.keras.Model], tf.keras.Model, str],
        critic: Union[Callable[[int, int], tf.keras.Model], tf.keras.Model, str],
        batch_size=BATCH_SIZE,
        memory_size: int = 1e5,
        gamma=0.99,
    ):
        """Inicializa el objeto.
        :param model: El modelo del objeto. Puede ser una función que devuelva
            un nuevo modelo (compilado), un objeto de `tf.keras.Model` ya
            existente (en cuyo caso se clonará) o una cadena con un path
            válido, en cuyo caso se cargará de disco.
        """
        # El entorno en el que vamos a trabajar
        self.env = env
        self.num_inputs = 26
        self.num_outputs = env.action_space.n

        # El modelo de aprendizaje de nuestro agente
        if callable(model):
            self.model = model(
                26,
                env.action_space.n,
            )
        elif isinstance(model, tf.keras.models.Model):
            self.model = model
        elif isinstance(model, str):
            self.model = tf.keras.models.load_model(model)
        else:
            raise ValueError('Valid models are a function, a model or a path')
        if callable(critic):
            self.critic = critic(
                27,
                1,
            )
        elif isinstance(critic, tf.keras.models.Model):
            self.critic = critic
        elif isinstance(critic, str):
            self.critic = tf.keras.models.load_model(critic)
        else:
            raise ValueError('Valid models are a function, a model or a path')
        
        
        self.batch_size = batch_size
        self.gamma = gamma

        self.memory = Memory(size=1e5)

        # El estado del entorno en el que se encuentra el agente
        self.current_state = None
        # El número de paso en la tarea en la que se encuentra el agente
        self.current_step = 0

    def episode(self, epsilon=0, max_iterations=None):
        
        max_iterations = max_iterations or np.inf
        pydirectinput.press('q')
        self.current_state, _ = self.env.reset()
        self.current_step = 0
        reward = 0
        running = True
        time.sleep(1)
        pydirectinput.press('q')
        print("Episode startq")
        while running and self.current_step < max_iterations:
            self.current_step += 1
            if self.current_step < 2:
                pydirectinput.press('q')
            perception = self.perceive()
            action = self.decide(list(perception.values()), epsilon)
            transition = self.act(action)
            self.learn()

            reward += transition.reward
            running = not transition.terminated
        pydirectinput.press('q')
        return reward

    def perceive(self):
        return self.current_state
        
    def decide(self, perception, epsilon=0):
        if np.random.rand() < epsilon:
            return random.randrange(environment.action_space.n)
        else:
            #print(f"perception {perception}")
            #print(f"axis {type(np.newaxis)}")
            #print(f"naxis {perception[np.newaxis, ...]}")
            state = []
            for value in perception:
                if isinstance(value, np.ndarray):
                    for aux in value:
                           state.append(aux)
                else:
                    state.append(value)
            perception = [state]
            q_values = self.model.predict(perception, verbose=0)
            return np.argmax(q_values[0])
    
    def act(self, action):
        # Ejecutamos la acción sobre el entorno
        next_state, reward, terminated, truncated, info = self.env.step(action)

        # Guardamos en la memoria del agente la transición realizada junto con
        # su información
        self.memory.remember(Transition(
            prev_state=list(self.current_state.values()),
            next_state=list(next_state.values()),
            action=action,
            reward=reward,
            terminated=terminated,
        ))
        
        # Indicamos el nuevo estado en el que se encuentra el agente ahora
        self.current_state = next_state
        
        return self.memory[-1]

    def learn(self):
        if len(self.memory) > self.batch_size:
            transactions = self.memory.batch(self.batch_size)
            prev_states = []
            actions = []
            for t in transactions:
                state = []
                for value in t.prev_state:
                    if isinstance(value, np.ndarray):
                        for aux in value:
                            state.append(aux)
                    else:
                        state.append(value)
                prev_states.append([state])
                actions.append(t.action)
            next_states = []
            for t in transactions:
                state = []
                for value in t.prev_state:
                    if isinstance(value, np.ndarray):
                        for aux in value:
                            state.append(aux)
                    else:
                        state.append(value)
                next_states.append([state])
            qs = [self.model.predict(state, verbose=0) for state in prev_states]
            next_qs = [self.model.predict(state, verbose=0) for state in next_states]
            qs = [q.tolist() for q in qs]
            next_qs = [q.tolist() for q in next_qs]
            for i, (txn, next_q) in enumerate(zip(transactions, next_qs)):
                if (txn.terminated):
                    qs[i][0][txn.action] = txn.reward - statistics.mean(qs[i][0])
                    pydirectinput.press('q')
                else:
                    qs[i][0][txn.action] = txn.reward + self.gamma * np.max(next_q) - statistics.mean(qs[i][0])
            for a in range(len(qs)):
                for b in range(len(qs[a][0])):
                    qs[a][0][b] = qs[a][0][b] - statistics.mean(qs[a][0])
            qs = [item for sublist in qs for item in sublist]
            prev_states = [item for sublist in prev_states for item in sublist]
            macarron = copy.deepcopy(prev_states)
            for i, (state, action) in enumerate(zip(macarron, actions)):
                state.append(action)
            self.critic.fit(macarron, qs, len(qs))
            for i, (state, action) in enumerate(zip(macarron, actions)):
                qs[i][action] = (self.critic.predict([state])).tolist()[0][0]
            self.model.fit(prev_states, qs, len(qs))
            self.memory = Memory(size=1e5)

In [5]:
def build_model(num_inputs, num_outputs) -> tf.keras.models.Model:
    """Crea un nuevo modelo con nuestro agente."""
    model = tf.keras.models.Sequential([
        tf.keras.layers.Dense(32, activation='relu', kernel_initializer='he_normal', input_shape=(num_inputs,)),
        tf.keras.layers.Dense(32, activation='relu', kernel_initializer='he_normal'),
        tf.keras.layers.Dense(num_outputs, activation='linear'),
    ])

    model.compile(
        loss='mse',
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001)
    )
    
    return model

In [6]:
environment = gymnasium.make("SoulsGymIudex-v0")
agent = Agent(env=environment, model=build_model, critic=build_model)




In [7]:
import pandas as pd
try:
    
    file = open("rewards.txt", "r")         # Abrimos el fichero de recompensas en modo lectura
    rewards = file.read()                   # Leemos el fichero de recompensas

    # Convertimos el fichero de recompensas en una lista
    rewards = [float(i) for i in rewards.split("\n")]
    
    file = open("rewards_avg.txt", "r")     # Abrimos el fichero de recompensas en modo lectura
    rewards_avg = file.read()               # Leemos el fichero de recompensas

    # Convertimos el fichero de recompensas en una lista
    rewards_avg = [float(i) for i in rewards_avg.split("\n")]
    
    BEST_TASK = 599#np.argmax(rewards)
    print(f'Best model: {BEST_TASK} with {rewards[BEST_TASK]}')

    BEST_MODEL = tf.keras.models.load_model(f'models/DS3model-{BEST_TASK}.h5')
    agent.model = BEST_MODEL
    
    
except:
    print ("Files not found. Training from scratch...")
    env = gymnasium.make("SoulsGymIudex-v0")
    NUMBER_OF_EPISODES = 1000
    MAX_STEPS = 500

    RUNNING_AVG_WINDOW_SIZE = 10

    MAX_EPSILON = 1
    MIN_EPSILON = 0.01
    DEC_EPSILON = 0.995

    rewards = []
    rewards_avg = []
    epsilon = MAX_EPSILON

    for episode in range(NUMBER_OF_EPISODES):
        pydirectinput.press('q')
        # Reseteamos el entorno y el agente para comenzar un nuevo episodio
        reward = agent.episode(epsilon, MAX_STEPS)
        # Salvamos el modelo
        pydirectinput.press('q')
        agent.model.save(f'models/DS3model-{episode}.h5')
        agent.critic.save(f'critics/DS3model-{episode}.h5')
        # Actualizamos el histórico de valores
        rewards.append(reward)
        rewards_avg.append(np.mean(rewards[-RUNNING_AVG_WINDOW_SIZE:]))
        # Guardamos los datos en un fichero
        with open('rewards.txt', 'w') as f:
            f.write('\n'.join(map(str, rewards)))
        with open('rewards_avg.txt', 'w') as f:
            f.write('\n'.join(map(str, rewards_avg)))
        
        # Imprimimos un mensaje
        print(f'Episode: {episode}, reward: {reward:5} (best: {max(rewards):5}, avg. window: {rewards_avg[-1]:5.4})', end='\r')

        # Disminuimos 𝜀 para reducir la aletoriedad de la selección de acción
        epsilon *= DEC_EPSILON
        epsilon = max(MIN_EPSILON, epsilon)
        
        # Clear de la sesión porque con fit y predict se quedan nodos del grafo
        # referenciados en memoria que nunca se limpian y, por tanto, me consumen
        # toda la memoria después de bastantes vueltas del bucle.
        tf.keras.backend.clear_session()
        
    BEST_TASK = np.argmax(rewards)
    print(f'Best model: {BEST_TASK} with {rewards[BEST_TASK]}')

    BEST_MODEL = tf.keras.models.load_model(f'models/NJ-LunarLander-{BEST_TASK}.keras')
    agent.model = BEST_MODEL

Files not found. Training from scratch...
Episode startq


  saving_api.save_model(


Episode startqward: -1.0924120055045732 (best: -1.0924120055045732, avg. window: -1.092)



Episode startqward: -1.1181718333466995 (best: -1.0924120055045732, avg. window: -1.105)




Episode startqward: -1.1362245937797306 (best: -1.0924120055045732, avg. window: -1.116)




Episode startqward: -1.1042477086030522 (best: -1.0924120055045732, avg. window: -1.113)


Episode startqward: -1.0670568353766514 (best: -1.0670568353766514, avg. window: -1.104)




Episode startqward: -0.9572767150819941 (best: -0.9572767150819941, avg. window: -1.079)


Episode startqward: -1.113593152018322 (best: -0.9572767150819941, avg. window: -1.084)
Episode startqward: -1.1332516052887547 (best: -0.9572767150819941, avg. window: -1.09)








Episode startqward: -1.1288632233453473 (best: -0.9572767150819941, avg. window: -1.095)


Episode startqward: -1.1180129141188022 (best: -0.9572767150819941, avg. window: -1.097)






KeyboardInterrupt: 

In [None]:
QQDenv = gymnasium.make("SoulsGymIudex-v0")
agent = Agent(env=environment, model=build_model)
obs, info = env.reset()
terminated = False

while not terminated:
    action = agent action.predict(loque sea)
    next_obs, reward, terminated, truncated, info = env.step(action)

env.close() 