In [None]:
import torch
import numpy as np
import pandas as pd
from transformers import BertTokenizer,BertModel, Trainer, TrainingArguments
from torch.utils.data import Dataset
from transformers import LongformerTokenizer, LongformerModel
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
import numpy as np
SMALL_DATASET = False
ROUTE = "/content/drive/MyDrive/Data/cityA_groundtruthdata_small.csv" if SMALL_DATASET else "/content/drive/MyDrive/Data/cityA_groundtruthdata_first_3000_users.csv"

['__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', 'calc_dtw', 'calc_dtw_orig', 'calc_dtw_single', 'calc_geobleu', 'calc_geobleu_orig', 'calc_geobleu_single', 'seq_eval']


In [None]:
df = pd.read_csv(ROUTE)
df.head()

Unnamed: 0,uid,d,t,x,y
0,0,0,25,75,82
1,0,0,24,76,86
2,0,0,22,81,89
3,0,18,13,86,97
4,0,18,12,85,97


In [None]:
from sklearn.preprocessing import MinMaxScaler
def get_scaler(df: pd.DataFrame, columns: list):
    """
    Ajusta un MinMaxScaler a las columnas especificadas del DataFrame y devuelve el scaler.

    Args:
        df (pd.DataFrame): DataFrame con los datos originales.
        columns (list): Lista de nombres de columnas a normalizar.

    Returns:
        scaler: Instancia ajustada de MinMaxScaler.
    """
    # Crear un scaler
    scaler = MinMaxScaler()

    # Ajustar el scaler a las columnas especificadas
    scaler.fit(df[columns])
    return scaler

def normalize_data(df: pd.DataFrame, scaler: MinMaxScaler, columns: list):
    """
    Normaliza las columnas especificadas de un DataFrame utilizando un MinMaxScaler ya ajustado.

    Args:
        df (pd.DataFrame): DataFrame con los datos originales.
        scaler (MinMaxScaler): Scaler ajustado con los datos de entrenamiento.
        columns (list): Lista de nombres de columnas a normalizar.

    Returns:
        pd.DataFrame: DataFrame normalizado
    """
    # Transformar las columnas especificadas con el scaler ajustado
    df[columns] = scaler.transform(df[columns])
    return df
def inverse_transform_predictions(predictions, scaler: MinMaxScaler, columns_to_desnormalize):
    """
    Desnormaliza las predicciones del modelo usando el scaler ajustado.

    Args:
        predictions (torch.Tensor or np.ndarray): Predicciones normalizadas con forma (batch_size, sequence_length, num_features) o (sequence_length, num_features).
        scaler (MinMaxScaler): Scaler ajustado con los datos de entrada.
        columns_to_desnormalize (list): Índices de las columnas a desnormalizar (por ejemplo, [0, 1] para x, y).

    Returns:
        np.ndarray: Predicciones desnormalizadas con la misma forma que las predicciones originales.
    """
    # Si es un tensor de PyTorch, conviértelo a un numpy array
    if isinstance(predictions, torch.Tensor):
        predictions = predictions.detach().cpu().numpy()

    # Si las predicciones tienen solo 2 dimensiones, agregar una dimensión de batch
    if predictions.ndim == 2:  # (sequence_length, num_features)
        predictions = predictions[np.newaxis, :, :]  # (1, sequence_length, num_features)

    batch_size, sequence_length, num_features = predictions.shape

    # Crear un array desnormalizado con la misma forma que las predicciones
    desnormalized = np.zeros_like(predictions)

    # Desnormalizar cada elemento del batch
    for i in range(batch_size):
        # Extraer la predicción del batch actual (sequence_length, num_features)
        single_prediction = predictions[i]

        # Crear una matriz con el mismo número de columnas que el scaler original
        full_array = np.zeros((sequence_length, scaler.n_features_in_))

        # Insertar las predicciones en las columnas correspondientes
        full_array[:, columns_to_desnormalize] = single_prediction

        # Aplicar la transformación inversa
        full_array = scaler.inverse_transform(full_array)

        # Extraer las columnas desnormalizadas correspondientes
        desnormalized[i] = full_array[:, columns_to_desnormalize]

    return desnormalized


In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler

def fill_missing_data(df: pd.DataFrame, padding_value=-1) -> pd.DataFrame:
    """
    Rellena los datos faltantes asegurando 48 timeslots por día por usuario.
    - Primero calcula `delta_t` en los datos originales antes de rellenar.
    - Luego, completa los timeslots faltantes y asigna `padding_value` a `delta_t` en las filas agregadas.

    Args:
        df (pd.DataFrame): DataFrame con datos originales ['uid', 'd', 't', 'x', 'y'].
        padding_value (int): Valor de padding para coordenadas y `delta_t` en filas agregadas.

    Returns:
        pd.DataFrame: Datos completos con padding en `delta_t`, `x`, `y` y normalización aplicada.
    """

    df["delta_t"] = df.groupby(["uid", "d"])["t"].diff().astype(float)
    df["delta_t"] = df["delta_t"].fillna(0)  # El primer valor del día tendrá `delta_t = 0`

    uids = df["uid"].unique()
    days = np.arange(df["d"].min(), df["d"].max() + 1)
    timeslots = np.arange(48)  # 48 timeslots por día

    complete_index = pd.MultiIndex.from_product([uids, days, timeslots], names=["uid", "d", "t"])
    complete_df = pd.DataFrame(index=complete_index).reset_index()

    df = pd.merge(complete_df, df, on=["uid", "d", "t"], how="left")

    df["day_of_week"] = df["d"] % 7

    df["delta_t"] = df["delta_t"].fillna(padding_value)

    df["x"] = df["x"].fillna(padding_value)
    df["y"] = df["y"].fillna(padding_value)

    return df


In [None]:
import pandas as pd
import numpy as np

import numpy as np

import numpy as np

def create_sequences(data, days=7, timestep_per_day=48, padding_value=0):
    """
    Genera ventanas de entrada (7 días previos) y salida (7 días siguientes),
    asegurando padding en coordenadas faltantes.

    Returns:
        Arrays con secuencias y máscaras de padding con las dimensiones correctas.
    """
    window_size = days * timestep_per_day
    inputs, outputs, mask_inputs, mask_outputs = [], [], [], []

    for uid in data["uid"].unique():
        user_data = data[data["uid"] == uid]

        for start_idx in range(0, len(user_data) - 2 * window_size, timestep_per_day):
            input_seq = user_data.iloc[start_idx:start_idx + window_size]
            output_seq = user_data.iloc[start_idx + window_size:start_idx + 2 * window_size]

            # Máscaras de valores reales
            mask_input_seq = np.where((input_seq["x"] == padding_value) & (input_seq["y"] == padding_value), 0, 1)
            mask_output_seq = np.where((output_seq["x"] == padding_value) & (output_seq["y"] == padding_value), 0, 1)

            # **Corregir `outputs` y `mask_outputs` para que tengan la forma correcta (336, 2)**
            inputs.append(input_seq[["d", "t", "x", "y", "day_of_week", "delta_t"]].values.tolist())
            outputs.append(output_seq[["x", "y"]].values.tolist())  # ✅  forma (336, 2)
            mask_inputs.append(np.tile(mask_input_seq.reshape(-1, 1), (1, 2)).tolist())
            mask_outputs.append(np.tile(mask_output_seq.reshape(-1, 1), (1, 2)).tolist())  # Expande a (336, 2)

    return np.array(inputs), np.array(mask_inputs), np.array(outputs), np.array(mask_outputs)

def split_data(df: pd.DataFrame) -> tuple:
    """
    Divide los datos en entrenamiento, validación y prueba, asegurando 15 días adicionales
    para las secuencias de validación y prueba.
    """
    # Entrenamiento: datos antes del día 44
    train = df[df["d"] < 44]

    # Validación: incluye días desde el 37 (44 - 7) para construir las secuencias
    val = df[(df["d"] >= 37) & (df["d"] < 59)]

    # Prueba: incluye días desde el 52 (59 - 7) para construir las secuencias
    test = df[df["d"] >= 52]

    return train.copy(), val.copy(), test.copy()


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class MobilityDataset(Dataset):
    def __init__(self, X, mask_X, Y, mask_Y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.mask_X = torch.tensor(mask_X, dtype=torch.int)
        self.Y = torch.tensor(Y, dtype=torch.float32)
        self.mask_Y = torch.tensor(mask_Y, dtype=torch.int)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.mask_X[idx], self.Y[idx], self.mask_Y[idx]


In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd

def build_dataloaders(df, batch_size=32, padding_value=-1,columns=['x','y','delta_t'], shuffle=True):
    """
    Preprocesa los datos, normaliza, genera secuencias y devuelve DataLoaders listos para entrenar.

    Args:
        df (pd.DataFrame): DataFrame con datos crudos.
        batch_size (int): Tamaño del batch para entrenamiento.
        padding_value (tuple): Valor usado para rellenar coordenadas faltantes.
        shuffle (bool): Si se deben mezclar los datos en el DataLoader.

    Returns:
        tuple: (train_loader, val_loader, test_loader)
    """

    print("🚀 Llenando datos faltantes y normalizando...")
    df_filled = fill_missing_data(df, padding_value)

    # **Dividir en Train, Validation y Test**
    train_df, val_df, test_df = split_data(df_filled)

    scaler = get_scaler(train_df, columns)

    # Normalizar los datos solamnete con l scaler del train
    train_data = normalize_data(train_df, scaler, columns)
    val_data = normalize_data(val_df, scaler, columns)
    test_data = normalize_data(test_df, scaler, columns)
    # display(train_data.head(100))
    # display(val_data.head(100))
    # display(test_data.head(300))

    print("📊 Generando secuencias...")
    X_train, mask_X_train, Y_train, mask_Y_train = create_sequences(train_data)
    print("📊 Creada secuencia de entrenamineto")
    X_val, mask_X_val, Y_val, mask_Y_val = create_sequences(val_data)
    print("📊 Creada secuencia de validación")
    X_test, mask_X_test, Y_test, mask_Y_test = create_sequences(test_data)
    print("📊 Creada secuencia de testing")

    print("🔄 Creando datasets...")
    train_dataset = MobilityDataset(X_train, mask_X_train, Y_train, mask_Y_train)
    val_dataset = MobilityDataset(X_val, mask_X_val, Y_val, mask_Y_val)
    test_dataset = MobilityDataset(X_test, mask_X_test, Y_test, mask_Y_test)

    ### **4️⃣ Creación de DataLoaders**
    print("📦 Creando DataLoaders...")
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=shuffle)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    print("✅ DataLoaders listos para entrenamiento.")
    return train_loader, val_loader, test_loader, scaler, test_data


In [None]:
# Construir DataLoaders
train_loader, val_loader, test_loader, scaler, test_data = build_dataloaders(df, batch_size=32,shuffle=False)

🚀 Llenando datos faltantes y normalizando...
📊 Generando secuencias...
📊 Creada secuencia de entrenamineto
📊 Creada secuencia de validación
📊 Creada secuencia de testing
🔄 Creando datasets...
📦 Creando DataLoaders...
✅ DataLoaders listos para entrenamiento.


In [None]:
import torch.nn as nn
from transformers import BertModel

class LP_BERT(nn.Module):
    def __init__(self, num_dates, num_timeslots, num_dayofweek, embed_dim_cat=32, hidden_dim=768, output_days=7, timeslots_per_day=48):
        super(LP_BERT, self).__init__()

        # Embeddings para variables categóricas
        self.date_embedding = nn.Embedding(num_dates, embed_dim_cat)
        self.timeslot_embedding = nn.Embedding(num_timeslots, embed_dim_cat)
        self.dayofweek_embedding = nn.Embedding(num_dayofweek, embed_dim_cat)

        # Proyección para variables continuas
        self.delta_t_projection = nn.Linear(1, embed_dim_cat)
        self.location_projection = nn.Linear(2, embed_dim_cat)

        # Normalización antes de la entrada a BERT
        self.layer_norm = nn.LayerNorm(embed_dim_cat)

        # Capa de proyección final antes de la entrada a BERT
        self.token_projection = nn.Linear(embed_dim_cat, hidden_dim)

        # Modelo BERT preentrenado
        self.bert = BertModel.from_pretrained("bert-base-uncased")

        # Capa de salida para predecir 7 días x 48 timeslots x 2 coordenadas
        self.output_size = output_days * timeslots_per_day * 2
        self.regressor = nn.Linear(hidden_dim, self.output_size)

    def forward(self, X, mask_X):
        """
        - X: Entrada en formato (batch_size, seq_length, feature_dim).
        - mask_X: Máscara de atención indicando qué valores son reales y cuáles padding.

        Retorna:
        - pred_coords: Predicción de coordenadas (batch_size, 336, 2).
        """
        batch_size, seq_length, feature_dim = X.shape  #  Ahora manejamos correctamente la forma

        # Extraer los valores desde X
        date = X[:, :, 0].long()
        timeslot = X[:, :, 1].long()
        location = X[:, :, 2:4].float()
        day_of_week = X[:, :, 4].long()
        delta_t = X[:, :, 5].float().unsqueeze(-1)  # (batch_size, seq_length, 1)

        # Obtener embeddings para variables categóricas
        emb_date = self.date_embedding(date)
        emb_timeslot = self.timeslot_embedding(timeslot)
        emb_dayofweek = self.dayofweek_embedding(day_of_week)

        # Proyección de variables continuas
        emb_delta_t = self.delta_t_projection(delta_t)
        emb_location = self.location_projection(location)

        # Add & Norm antes de BERT
        token_emb = emb_date + emb_timeslot + emb_dayofweek + emb_delta_t + emb_location
        token_emb = self.layer_norm(token_emb)

        # Proyectar al espacio hidden
        token_emb = self.token_projection(token_emb)

        # Usar mask_X como atención para BERT
        attention_mask = mask_X[:, :, 0].long()  # Seleccionamos solo una dimensión (batch_size, 336)

        # display(attention_mask.shape)

        # Pasar por BERT
        outputs = self.bert(inputs_embeds=token_emb, attention_mask=attention_mask)
        cls_output = outputs.last_hidden_state[:, 0, :]

        # Predicción de coordenadas futuras
        pred_coords = self.regressor(cls_output)
        return pred_coords.view(-1, 336, 2)  # 336 = 7 días * 48 timeslots * 2 coordenadas


In [None]:
import torch.nn.functional as F

def train_step(model, optimizer, batch, device):
    """
    Realiza un paso de entrenamiento con los datos organizados en X y mask_X.
    """
    model.train()
    optimizer.zero_grad()

    X, mask_X, target, mask_target = batch

    X = X.to(device)
    mask_X = mask_X.to(device)
    target = target.to(device)
    mask_target = mask_target.to(device)

    # Forward pass
    pred_location = model(X, mask_X)
    # display(pred_location.shape)
    # display(target.shape)
    # display(mask_target.shape)

    valid_values = mask_target.sum()  # Cuenta cuántos valores son realmente válidos
    loss = (F.mse_loss(pred_location * mask_target, target * mask_target, reduction='sum') / valid_values)


    # Backpropagation
    loss.backward()
    optimizer.step()

    return loss.item()


In [None]:
import torch
import torch.nn.functional as F
import copy
import pandas as pd
from tqdm import tqdm
from IPython.display import display, clear_output

def evaluate_model(model, val_loader, device):
    """
    Evalúa el modelo en el conjunto de validación con UNA SOLA BARRA de progreso.
    """
    model.eval()
    total_loss = 0

    progress_bar = tqdm(val_loader, desc=f"🔵 Validating...", position=0, leave=True)

    with torch.no_grad():
        for batch in progress_bar:
            X, mask_X, target, mask_target = batch
            X, mask_X, target, mask_target = X.to(device), mask_X.to(device), target.to(device), mask_target.to(device)

            pred_location = model(X, mask_X)
            valid_values = mask_target.sum()  # Cuenta cuántos valores son realmente válidos
            loss = (F.mse_loss(pred_location * mask_target, target * mask_target, reduction='sum') / valid_values)
            total_loss += loss.item()

            avg_loss = total_loss / (progress_bar.n + 1)
            progress_bar.set_postfix(val_loss=f"{avg_loss:.4f}")

    progress_bar.close()
    return total_loss / len(val_loader)


def train_model(model, train_loader, val_loader, optimizer, device, epochs=5):
    """
    Entrena el modelo con una barra de entrenamiento que se oculta al validar.
    Después de cada validación, muestra el historial de entrenamiento y sigue.
    """
    model.to(device)

    best_val_loss = float('inf')
    best_model_weights = copy.deepcopy(model.state_dict())
    history = []

    total_steps = epochs * len(train_loader)
    progress_bar = tqdm(total=total_steps, desc="🟢 Training Progress", position=0, leave=False)

    for epoch in range(1, epochs + 1):
        model.train()
        total_loss = 0

        for batch_idx, batch in enumerate(train_loader):
            loss = train_step(model, optimizer, batch, device)
            total_loss += loss

            current_progress = (epoch - 1) + ((batch_idx + 1) / len(train_loader))
            progress_bar.set_description(f"🟢 Epoch {current_progress:.1f}/{epochs}")
            progress_bar.set_postfix(epoch=f"{epoch}/{epochs}", train_loss=f"{total_loss / (batch_idx + 1):.4f}")
            progress_bar.update(1)

        val_loss = evaluate_model(model, val_loader, device)

        history.append({"Epoch": epoch, "Train Loss": total_loss / len(train_loader), "Val Loss": val_loss})

        clear_output(wait=True)
        history_df = pd.DataFrame(history)
        display(history_df)

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_weights = copy.deepcopy(model.state_dict())

    # Cargar los mejores pesos
    model.load_state_dict(best_model_weights)
    progress_bar.close()

    return model


In [None]:
from transformers import AdamW
# Configurar y entrenar el modelo
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LP_BERT(num_dates=70, num_timeslots=48, num_dayofweek=7)
optimizer = AdamW(model.parameters(), lr=1e-4)
epochs =10

model = train_model(model, train_loader, val_loader, optimizer, device, epochs=epochs)


Unnamed: 0,Epoch,Train Loss,Val Loss
0,1,0.049676,0.04674
1,2,0.046619,0.04661
2,3,0.046044,0.045971
3,4,0.045732,0.045898
4,5,0.045553,0.045608
5,6,0.045425,0.045578
6,7,0.045345,0.045532
7,8,0.045231,0.045479
8,9,0.045196,0.045454
9,10,0.045123,0.0454




In [None]:
val_loss = evaluate_model(model, val_loader, device)
print(f"Validation Loss: {val_loss:.4f}")
val_test = evaluate_model(model, test_loader, device)
print(f"Test Loss: {val_test:.4f}")

🔵 Validating...: 100%|██████████| 750/750 [01:31<00:00,  8.20it/s, val_loss=0.0454]


Validation Loss: 0.0454


🔵 Validating...: 100%|██████████| 844/844 [01:42<00:00,  8.21it/s, val_loss=0.0450]

Test Loss: 0.0450





In [25]:
import torch
import numpy as np

def evaluate_metrics_sequence_bert(
    test_loader, model, scaler, device
):
    """
    Evalúa las métricas basadas en predicciones generadas por `LP_BERT` para un batch de test.

    Args:
        test_loader (DataLoader): DataLoader de prueba que contiene `X, mask_X, Y, mask_Y`.
        model (nn.Module): Modelo LP_BERT entrenado.
        scaler (MinMaxScaler): Scaler para normalización/desnormalización.
        device (torch.device): Dispositivo donde se ejecutará el modelo.

    Returns:
        tuple: (`generated_sequences`, `reference_sequences`) en formato `(d, t, x, y)`.
    """
    model.eval()

    with torch.no_grad():
        for batch in test_loader:
            X, mask_X, target, mask_target = batch
            X, mask_X, target, mask_target = X.to(device), mask_X.to(device), target.to(device), mask_target.to(device)

            pred_location = model(X, mask_X)
            valid_values = mask_target.sum() 
            loss = (F.mse_loss(pred_location * mask_target, target * mask_target, reduction='sum') / valid_values)

            print(f"Test Loss: {loss.item():.4f}")
            break  

    generated_sequences = []
    reference_sequences = []

    times_per_day = 48
    current_day = 52
    current_timeslot = 0
    n_columns = scaler.n_features_in_

    for step_idx, (x_pred, y_pred) in enumerate(pred_location[0].cpu().numpy()):  # Tomamos solo el primer sample del batch
        if mask_target[0][step_idx].sum() > 0:  # Solo incluir valores válidos (sin padding)
            day_offset = current_timeslot // times_per_day
            d = current_day + day_offset
            t = current_timeslot % times_per_day

            x_true, y_true = target[0][step_idx].cpu().numpy()

            features_array = np.zeros(n_columns)
            features_array[0] = x_true  # `x`
            features_array[1] = y_true  # `y`

            # 🔹 Desnormalizar y redondear a enteros
            target_desnormalized = scaler.inverse_transform([features_array])
            x_true_desnormalized, y_true_desnormalized = np.round(target_desnormalized[0][:2]).astype(int)

            features_array_pred = np.zeros(n_columns)
            features_array_pred[0] = x_pred  # `x`
            features_array_pred[1] = y_pred  # `y`

            # 🔹 Desnormalizar y redondear a enteros
            predictions_desnormalized = scaler.inverse_transform([features_array_pred])
            x_pred_desnormalized, y_pred_desnormalized = np.round(predictions_desnormalized[0][:2]).astype(int)

            reference_sequences.append((d, t, x_true_desnormalized, y_true_desnormalized))
            generated_sequences.append((d, t, x_pred_desnormalized, y_pred_desnormalized))

        current_timeslot += 1

    return generated_sequences, reference_sequences


In [26]:
# Evaluar en una muestra del test set
generated_sequences, reference_sequences = evaluate_metrics_sequence_bert(
    val_loader, model, scaler, device
)
# print("generated")
# display(generated_sequences)
# print("reference")
# display(reference_sequences)

Test Loss: 0.0294


In [27]:
from abc import ABC, abstractmethod
import geobleu
import numpy as np


class Metric(ABC):
    """
    Clase base abstracta para diferentes métricas.
    """

    @abstractmethod
    def calculate(self, predictions, validation):
        """
        Calcula el puntaje de la métrica basada en las predicciones y los datos de validación.

        Args:
            predictions: Los datos predichos.
            validation: Los datos de validación.

        Returns:
            El puntaje calculado.
        """
        pass


class LPPMetric(Metric):
    """
    Métrica para calcular la Precisión de Predicción de Ubicación (LPP).
    """

    def calculate(self, predictions, validation):
        return calculate_lpp(validation, predictions)


class MAEMetric(Metric):
    """
    Métrica para calcular el Error Medio Absoluto (MAE).
    """

    def calculate(self, predictions_per_user, validation_per_user):
        return calculate_error_metrics(predictions_per_user, validation_per_user)["MAE"]


class GeoBLEUMetric(Metric):
    """
    Métrica para calcular el puntaje GeoBLEU.
    """

    def calculate(self, predictions_per_user, validation_per_user):
        return calculate_geobleu_for_quadrant(predictions_per_user, validation_per_user)


class DTWMetric(Metric):
    """
    Métrica para calcular el puntaje de Dynamic Time Warping (DTW).
    """

    def calculate(self, predictions_per_user, validation_per_user):
        return calculate_dtw_for_quadrant(predictions_per_user, validation_per_user)


def calculate_lpp(predictions_per_user, validation_per_user, tolerance=4.0):
    """
    Calcula la Precisión de Predicción de Ubicación (LPP), ignorando los valores NaN,
    y permitiendo un margen de error definido por un umbral.

    Args:
        predictions_per_user (list of list of tuples): Lista de trayectorias generadas por cada usuario.
        validation_per_user (list of list of tuples): Lista de trayectorias reales por cada usuario.
        tolerance (float): Margen de error en unidades de distancia para considerar una predicción correcta.

    Returns:
        float: La precisión de ubicación como porcentaje de coincidencias dentro del margen.
    """
    correct_predictions = 0
    total_predictions = 0

    for predicted_traj, actual_traj in zip(predictions_per_user, validation_per_user):
        for predicted_point, actual_point in zip(predicted_traj, actual_traj):
            # Asegurar que las coordenadas reales no sean NaN
            if not any(np.isnan(coord) for coord in actual_point[2:]):
                # Calcular distancia euclidiana entre puntos predichos y reales
                distance = np.sqrt(
                    (predicted_point[2] - actual_point[2]) ** 2
                    + (predicted_point[3] - actual_point[3]) ** 2
                )
                # Contar como correcta si la distancia está dentro del margen de tolerancia
                if distance <= tolerance:
                    correct_predictions += 1
                total_predictions += 1

    # Calcular el porcentaje de precisión
    lpp = (
        (correct_predictions / total_predictions) * 100
        if total_predictions > 0
        else 0.0
    )
    return lpp


def calculate_error_metrics(predictions_per_user, validation_per_user):
    """
    Calcula métricas de error entre trayectorias predichas y reales:
    - Error Medio Absoluto (MAE)
    - Error Máximo
    - Distribución de Errores

    Args:
        predictions_per_user (list of list of tuples): Trayectorias predichas [(d, t, x, y)].
        validation_per_user (list of list of tuples): Trayectorias reales [(d, t, x, y)].

    Returns:
        dict: Métricas calculadas.
    """
    errors = []

    for predicted_traj, actual_traj in zip(predictions_per_user, validation_per_user):
        for predicted_point, actual_point in zip(predicted_traj, actual_traj):
            # Ignorar puntos donde las coordenadas reales sean NaN
            if not any(np.isnan(coord) for coord in actual_point[2:]):
                # Calcular distancia euclidiana
                distance = np.sqrt(
                    (predicted_point[2] - actual_point[2]) ** 2
                    + (predicted_point[3] - actual_point[3]) ** 2
                )
                errors.append(distance)

    # Calcular métricas
    mae = np.mean(errors) if errors else 0.0
    max_error = np.max(errors) if errors else 0.0

    # Retornar resultados
    return {"MAE": mae, "Max Error": max_error, "Errors": errors}


def calculate_geobleu_for_quadrant(predictions_per_user, validation_per_user):
    """
    Calcula el puntaje promedio de GeoBLEU para cada usuario comparando las trayectorias generadas y de referencia.

    Args:
        predictions_per_user (list of list of tuples): Lista de trayectorias generadas por cada usuario.
        validation_per_user (list of list of tuples): Lista de trayectorias de referencia por cada usuario.

    Returns:
        float: El puntaje promedio de GeoBLEU para todos los usuarios comparados.
    """
    total_score = 0
    valid_users = 0
    for predictions, validation in zip(predictions_per_user, validation_per_user):
        if predictions and validation:  # Aseguramos que haya datos comparables
            score = geobleu.calc_geobleu(predictions, validation, processes=3)
            total_score += score
            valid_users += 1
    return total_score / valid_users if valid_users else 0


def calculate_dtw_for_quadrant(predictions_per_user, validation_per_user):
    """
    Calcula el puntaje promedio de DTW para cada usuario comparando las trayectorias generadas y de referencia.

    Args:
        predictions_per_user (list of list of tuples): Lista de trayectorias generadas por cada usuario.
        validation_per_user (list of list of tuples): Lista de trayectorias de referencia por cada usuario.

    Returns:
        float: El puntaje promedio de DTW para todos los usuarios comparados.
    """
    total_score = 0
    valid_users = 0
    for predictions, validation in zip(predictions_per_user, validation_per_user):
        if predictions and validation:  # Aseguramos que haya datos comparables
            score = geobleu.calc_dtw(predictions, validation)
            total_score += score
            valid_users += 1
    return total_score / valid_users if valid_users else 0


In [28]:
metrics_instances = [
    LPPMetric(),
    MAEMetric(),
    DTWMetric(),
    GeoBLEUMetric(),
]
# Calcular métricas
results = {}
for metric in metrics_instances:
    metric_name = type(metric).__name__
    results[metric_name] = metric.calculate([generated_sequences], [reference_sequences])

# Mostrar resultados
print("Resultados de las Métricas:")
for metric_name, score in results.items():
    print(f"{metric_name} score: {score}")

Resultados de las Métricas:
LPPMetric score: 32.35294117647059
MAEMetric score: 31.702480626160508
DTWMetric score: 72.92499549871108
GeoBLEUMetric score: 0.05015959920904115
