In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
import pennylane as qml
import numpy as np
import gymnasium as gym
from collections import namedtuple
import os
from datetime import datetime
from tqdm import tqdm
from torch.nn.parameter import Parameter
#from agent import Agent  # Import the Agent class from agent.py

In [2]:
# Representa una transición en el entorno, con estado, acción, recompensa, 
# si el episodio ha terminado (done), y el siguiente estado.
Transition = namedtuple('Transition', ('state', 'action', 'reward', 'done', 'next_state'))

# Inicializa el buffer de memoria para almacenar las transiciones.
def init_replay_memory(buffer_size):
    return {
        "buffer_size": buffer_size,  # Tamaño máximo del buffer
        "memory": [],               # Lista para almacenar las transiciones
        "position": 0               # Índice de la posición actual para insertar
    }

# Agrega una transición al buffer.
def push_replay_memory(replay_memory, *args):
    # Si la memoria aún no está llena, agrega un nuevo espacio
    if len(replay_memory["memory"]) < replay_memory["buffer_size"]:
        replay_memory["memory"].append(None)
    
    # Sobrescribe la posición actual con la nueva transición
    replay_memory["memory"][replay_memory["position"]] = Transition(*args)
    # Actualiza la posición de manera circular
    replay_memory["position"] = (replay_memory["position"] + 1) % replay_memory["buffer_size"]

# Muestra una muestra aleatoria del buffer.
def sample_replay_memory(replay_memory, batch_size, device):
    # Selecciona índices aleatorios sin repetición
    indices = np.random.choice(len(replay_memory["memory"]), batch_size, replace=False)
    
    # Extrae las transiciones correspondientes
    states, actions, rewards, dones, next_states = zip(
        *[replay_memory["memory"][idx] for idx in indices]
    )
    
    # Convierte los datos a tensores con los tipos adecuados
    states = np.array(states) 
    states = torch.tensor(states, dtype=torch.float32, device=device)
    actions = torch.tensor(actions, dtype=torch.long, device=device)  # Se asume que las acciones son discretas
    rewards = torch.tensor(rewards, dtype=torch.float32, device=device)
    dones = torch.tensor(dones, dtype=torch.bool, device=device)      # Se usa tipo booleano para "dones"
    next_states = torch.tensor(next_states, dtype=torch.float32, device=device)

    return states, actions, rewards, dones, next_states

# Obtiene la longitud del buffer de memoria.
def replay_memory_length(replay_memory):
    return len(replay_memory["memory"])

In [3]:
# Inicializa los parámetros del agente y devuelve un diccionario que los representa.
def init_agent(net, action_space, exploration_initial_eps, exploration_decay, exploration_final_eps):
    return {
        "net": net,                          # Red neuronal para calcular las acciones (Q-values)
        "action_space": action_space,        # Espacio de acciones disponible
        "exploration_initial_eps": exploration_initial_eps,  # Epsilon inicial para la exploración
        "exploration_decay": exploration_decay,              # Decadencia del epsilon
        "exploration_final_eps": exploration_final_eps,      # Epsilon mínimo final
        "epsilon": 0.                        # Epsilon actual (se actualiza en cada paso)
    }

# Devuelve una acción según el estado actual y la política del agente.
def agent_call(agent, state, device=torch.device('cpu')):
    # Explora aleatoriamente con probabilidad epsilon o elige acción basada en Q-values.
    if np.random.random() < agent["epsilon"]:
        action = get_random_action(agent)
    else:
        action = get_action(agent, state, device)
    return action

# Genera una acción aleatoria del espacio de acciones.
def get_random_action(agent):
    action = agent["action_space"].sample()  # Muestra una acción aleatoria del espacio
    return action

# Calcula la acción óptima según los Q-values de la red neuronal.
def get_action(agent, state, device=torch.device('cpu')):
    # Convierte el estado a tensor si no lo es ya.
    if not isinstance(state, torch.Tensor):
        state = torch.tensor([state], dtype=torch.float32)  # Especifica el tipo de datos para el tensor

    state = state.to(device)  # Asegura que el estado esté en el dispositivo correcto (CPU/GPU)

    # Evalúa los Q-values usando la red neuronal
    q_values = agent["net"].eval()(state)
    _, action = torch.max(q_values, dim=1)  # Selecciona la acción con el mayor Q-value
    return int(action.item())

# Actualiza el valor de epsilon según el paso actual.
def update_epsilon(agent, step):
    agent["epsilon"] = max(
        agent["exploration_final_eps"],
        agent["exploration_final_eps"] +
        (agent["exploration_initial_eps"] - agent["exploration_final_eps"]) *
        agent["exploration_decay"]**step
    )
    return agent["epsilon"]

In [4]:
# Inicializa los parámetros del entrenador en un diccionario
def init_trainer(env, net, target_net, gamma, learning_rate, batch_size,
                 exploration_initial_eps, exploration_decay, exploration_final_eps,
                 train_freq, target_update_interval, buffer_size, learning_rate_input=None,
                 learning_rate_output=None, loss_func='MSE', optim_class='RMSprop',
                 device='cpu', logging=False):
    
    assert loss_func in ['MSE', 'L1', 'SmoothL1'], "Supported losses : ['MSE', 'L1', 'SmoothL1']"
    assert optim_class in ['SGD', 'RMSprop', 'Adam', 'Adagrad', 'Adadelta'], \
        "Supported optimizers : ['SGD', 'RMSprop', 'Adam', 'Adagrad', 'Adadelta']"
    assert device in ['auto', 'cpu', 'cuda:0'], "Supported devices : ['auto', 'cpu', 'cuda:0']"

    # Configura el dispositivo
    device = torch.device("cuda:0" if torch.cuda.is_available() and device == "auto" else device)
    net = net.to(device)
    target_net = target_net.to(device)

    # Configura la función de pérdida
    loss_func = getattr(nn, loss_func + 'Loss')()

    # Configura el optimizador
    optim_class = getattr(optim, optim_class)
    params = [{'params': net.q_layers.parameters()}]
    if hasattr(net, 'w_input') and net.w_input is not None:
        params.append({'params': net.w_input, 'lr': learning_rate_input or learning_rate})
    if hasattr(net, 'w_output') and net.w_output is not None:
        params.append({'params': net.w_output, 'lr': learning_rate_output or learning_rate})
    opt = optim_class(params, lr=learning_rate)

    # Inicializa la memoria de repetición
    memory = init_replay_memory(buffer_size)

    # Inicializa el agente
    agent = init_agent(net, env.action_space, exploration_initial_eps, exploration_decay, exploration_final_eps)

    # Configura el registro de logs
    log_dir = None
    writer = None
    if logging:
        exp_name = datetime.now().strftime("DQN-%d_%m_%Y-%H_%M_%S")
        log_dir = os.path.join('./logs/', exp_name)
        os.makedirs(log_dir, exist_ok=True)
        writer = SummaryWriter(log_dir=log_dir)

    return {
        "env": env,
        "net": net,
        "target_net": target_net,
        "gamma": gamma,
        "learning_rate": learning_rate,
        "batch_size": batch_size,
        "train_freq": train_freq,
        "target_update_interval": target_update_interval,
        "memory": memory,
        "agent": agent,
        "opt": opt,
        "loss_func": loss_func,
        "device": device,
        "logging": logging,
        "log_dir": log_dir,
        "writer": writer,
        "global_step": 0,
        "episode_count": 0
    }

# Reinicia el entrenador para un nuevo entrenamiento
def reset_trainer(trainer):
    trainer["global_step"] = 0
    trainer["episode_count"] = 0
    trainer["n_actions"] = trainer["env"].action_space.n

    # Llena el buffer de memoria
    state, _ = trainer["env"].reset(seed=123)
    while replay_memory_length(trainer["memory"]) < trainer["memory"]["buffer_size"]:
        action = get_random_action(trainer["agent"])
        next_state, reward, terminated, truncated, _ = trainer["env"].step(action)
        done = terminated or truncated
        push_replay_memory(trainer["memory"], state, action, reward, done, next_state)
        state = next_state if not done else trainer["env"].reset(seed=123)[0]

# Actualiza la red neuronal principal
def update_net(trainer):
    trainer["net"].train()
    trainer["opt"].zero_grad()

    # Muestra de memoria
    states, actions, rewards, dones, next_states = sample_replay_memory(
        trainer["memory"], trainer["batch_size"], trainer["device"]
    )

    # Q-values actuales
    state_action_values = trainer["net"](states).gather(1, actions.unsqueeze(-1)).squeeze(-1)

    # Q-values objetivo
    with torch.no_grad():
        next_state_values = trainer["target_net"](next_states).max(1)[0].detach()
    expected_state_action_values = rewards + trainer["gamma"] * (1 - dones.float()) * next_state_values

    # Calcula la pérdida
    loss = trainer["loss_func"](state_action_values, expected_state_action_values)
    loss.backward()
    trainer["opt"].step()

    return loss.item()

# Copia los parámetros de la red principal a la red objetivo
def update_target_net(trainer):
    trainer["target_net"].load_state_dict(trainer["net"].state_dict())

# Ejecuta un paso de entrenamiento
def train_step(trainer):
    episode_epsilon = update_epsilon(trainer["agent"], trainer["episode_count"])
    episode_steps, episode_reward, episode_loss = 0, 0, []

    state, _ = trainer["env"].reset(seed=123)
    done = False

    while not done:
        action = agent_call(trainer["agent"], state, trainer["device"])
        next_state, reward, terminated, truncated, _ = trainer["env"].step(action)
        done = terminated or truncated

        # Actualiza la memoria
        push_replay_memory(trainer["memory"], state, action, reward, done, next_state)
        state = next_state

        # Actualiza la red
        if trainer["global_step"] % trainer["train_freq"] == 0:
            loss = update_net(trainer)
            episode_loss.append(loss)

        # Actualiza la red objetivo
        if trainer["global_step"] % trainer["target_update_interval"] == 0:
            update_target_net(trainer)

        trainer["global_step"] += 1
        episode_reward += reward
        episode_steps += 1

    trainer["episode_count"] += 1
    return {
        'steps': episode_steps,
        'loss': np.mean(episode_loss) if episode_loss else 0.,
        'reward': episode_reward,
        'epsilon': episode_epsilon
    }

# Realiza un paso de evaluación
def test_step(trainer, n_eval_episodes):
    episode_steps = []
    episode_rewards = []

    for _ in range(n_eval_episodes):
        state, _ = trainer["env"].reset(seed=123)  # Reinicia el entorno
        done = False
        episode_steps.append(0)
        episode_rewards.append(0)

        while not done:
            # Selecciona la mejor acción basada en la red (sin exploración)
            action = get_action(trainer["agent"], state, trainer["device"])
            next_state, reward, terminated, truncated, _ = trainer["env"].step(action)
            done = terminated or truncated

            # Actualiza los valores acumulados
            state = next_state
            episode_steps[-1] += 1
            episode_rewards[-1] += reward

    # Calcula promedios
    avg_steps = np.mean(episode_steps)
    avg_reward = np.mean(episode_rewards)

    return {'steps': avg_steps, 'reward': avg_reward}

# Registra los resultados del entrenamiento y evaluación
def log_training(trainer, train_stats, test_stats, episode, log_train_freq, log_ckp_freq):
    if trainer["logging"]:
        writer = trainer["writer"]

        # Registra las estadísticas de entrenamiento
        if episode % log_train_freq == 0:
            for key, value in train_stats.items():
                writer.add_scalar(f"train/{key}", value, episode)

        # Registra las estadísticas de evaluación
        if test_stats and (episode % log_train_freq == 0):
            for key, value in test_stats.items():
                writer.add_scalar(f"test/{key}", value, episode)

        # Guarda los pesos del modelo
        if log_ckp_freq > 0 and episode % log_ckp_freq == 0:
            checkpoint_path = os.path.join(trainer["log_dir"], f"episode_{episode}.pt")
            torch.save(trainer["net"].state_dict(), checkpoint_path)

# Ejecuta el entrenamiento del modelo por un número total de episodios
def train(trainer, total_episodes, n_eval_episodes=5, log_train_freq=-1, log_eval_freq=-1, log_ckp_freq=-1):
    # Inicializa las estadísticas
    postfix_stats = {}
    with tqdm(range(total_episodes), desc="DQN Training", unit="episode") as episodes:

        for episode in episodes:
            # Paso de entrenamiento
            train_stats = train_step(trainer)
            postfix_stats['train/reward'] = train_stats['reward']
            postfix_stats['train/steps'] = train_stats['steps']

            # Evaluación periódica
            test_stats = None
            if episode % log_eval_freq == 0:
                test_stats = test_step(trainer, n_eval_episodes)
                postfix_stats['test/reward'] = test_stats['reward']
                postfix_stats['test/steps'] = test_stats['steps']

            # Registro de métricas
            log_training(trainer, train_stats, test_stats, episode, log_train_freq, log_ckp_freq)

            # Actualiza la barra de progreso
            episodes.set_postfix(postfix_stats)

        # Guardar el modelo final
        if trainer["logging"] and log_ckp_freq > 0:
            final_model_path = os.path.join(trainer["log_dir"], "final_model.pt")
            torch.save(trainer["net"].state_dict(), final_model_path)




In [5]:
# Codifica las entradas en los qubits mediante puertas RX
def encode(n_qubits, inputs):
    for wire in range(n_qubits):
        qml.RX(inputs[wire], wires=wire)

# Define una capa de operaciones cuánticas con pesos en RY y RZ
def layer(n_qubits, y_weights, z_weights):
    for wire, y_weight in enumerate(y_weights):
        qml.RY(y_weight, wires=wire)
    for wire, z_weight in enumerate(z_weights):
        qml.RZ(z_weight, wires=wire)
    for wire in range(n_qubits):
        qml.CZ(wires=[wire, (wire + 1) % n_qubits])

# Define las mediciones que se realizan en el circuito
def measure(n_qubits):
    return [
        qml.expval(qml.PauliZ(0) @ qml.PauliZ(1)),
        qml.expval(qml.PauliZ(2) @ qml.PauliZ(3))
    ]

# Construye el modelo cuántico usando PennyLane y lo envuelve en un TorchLayer
def get_model(n_qubits, n_layers, data_reupload):
    dev = qml.device("default.qubit", wires=n_qubits)
    
    # Define las formas de los parámetros de las capas
    shapes = {
        "y_weights": (n_layers, n_qubits),
        "z_weights": (n_layers, n_qubits)
    }

    # Define el circuito cuántico
    @qml.qnode(dev, interface='torch')
    def circuit(inputs, y_weights, z_weights):
        for layer_idx in range(n_layers):
            if (layer_idx == 0) or data_reupload:
                encode(n_qubits, inputs)
            layer(n_qubits, y_weights[layer_idx], z_weights[layer_idx])
        return measure(n_qubits)

    # Envuelve el circuito en un TorchLayer
    model = qml.qnn.TorchLayer(circuit, shapes)
    return model

# Inicializa los parámetros del QuantumNet
def init_quantum_net(n_layers, w_input, w_output, data_reupload):
    n_qubits = 4  # Número de qubits
    n_actions = 2  # Número de acciones

    # Obtiene el modelo cuántico
    q_layers = get_model(n_qubits=n_qubits, n_layers=n_layers, data_reupload=data_reupload)

    # Inicializa los parámetros de entrada y salida si se especifica
    w_input_param = None
    if w_input:
        w_input_param = torch.empty(n_qubits)
        nn.init.normal_(w_input_param, mean=0.)

    w_output_param = None
    if w_output:
        w_output_param = torch.empty(n_actions)
        nn.init.normal_(w_output_param, mean=90.)

    return {
        "n_qubits": n_qubits,
        "n_actions": n_actions,
        "data_reupload": data_reupload,
        "q_layers": q_layers,
        "w_input": w_input_param,
        "w_output": w_output_param
    }

# Implementa la operación "forward" para el modelo QuantumNet
def quantum_net_forward(quantum_net, inputs):
    # Aplica el peso de entrada si está definido
    if quantum_net["w_input"] is not None:
        inputs = inputs * quantum_net["w_input"]

    # Aplica la función no lineal atan
    inputs = torch.atan(inputs)

    # Evalúa el circuito cuántico para cada entrada en el batch
    outputs = torch.stack([quantum_net["q_layers"](input) for input in inputs])

    # Ajusta los valores de salida al rango [0, 1]
    outputs = (1 + outputs) / 2

    # Aplica el peso de salida si está definido, o multiplica por un valor predeterminado
    if quantum_net["w_output"] is not None:
        outputs = outputs * quantum_net["w_output"]
    else:
        outputs = 90 * outputs

    return outputs

In [6]:
n_layers = 5
gamma = 0.99
w_input = True
w_output = True
lr = 0.001
lr_input = 0.01
lr_output = 0.01
batch_size = 16
eps_init = 1.
eps_decay = 0.99
eps_min = 0.01
train_freq = 10
target_freq = 30
memory = 10000
data_reupload = True
loss = 'SmoothL1'
optimizer = 'RMSprop'
total_episodes = 5000
n_eval_episodes = 5
logging = True
log_train_freq = 1
log_eval_freq = 20
log_ckp_freq = 20
device = 'cpu'  # 'auto', 'cpu', 'cuda:0'



In [None]:
# Importa librerías necesarias
import gym
from torch.utils.tensorboard import SummaryWriter

# Define el entorno
env_name = 'CartPole-v1'  # Cambia a CartPole-v1
env = gym.make(env_name)

# Inicializa las redes neuronales cuánticas
net = init_quantum_net(n_layers, w_input, w_output, data_reupload)
target_net = init_quantum_net(n_layers, w_input, w_output, data_reupload)

# Inicializa el entrenador
trainer = init_trainer(
    env=env,
    net=net["q_layers"],         # Modelo cuántico principal
    target_net=target_net["q_layers"],  # Modelo cuántico objetivo
    gamma=gamma,
    learning_rate=lr,
    batch_size=batch_size,
    exploration_initial_eps=eps_init,
    exploration_decay=eps_decay,
    exploration_final_eps=eps_min,
    train_freq=train_freq,
    target_update_interval=target_freq,
    buffer_size=memory,
    learning_rate_input=lr_input,
    learning_rate_output=lr_output,
    loss_func=loss,
    optim_class=optimizer,
    device=device,
    logging=logging
)

# Reinicia el entrenador para inicializar la memoria
reset_trainer(trainer)

# Configuración de TensorBoard
from datetime import datetime
log_dir = f"logs/{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}/"
writer = SummaryWriter(log_dir=log_dir)
print(f"TensorBoard logs stored at: {log_dir}")

# Entrena el modelo
train(
    trainer=trainer,
    total_episodes=total_episodes,
    n_eval_episodes=n_eval_episodes,
    log_train_freq=log_train_freq,
    log_eval_freq=log_eval_freq,
    log_ckp_freq=log_ckp_freq
)

# Lanza TensorBoard
%load_ext tensorboard
%tensorboard --logdir=logs/