In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import re
from typing import List, Dict
import numpy as np
import math
from torch.utils.data import Dataset, DataLoader

In [None]:
def parse_trajectories(file_path):
    with open(file_path, 'r') as f:
        content = f.read()

    blocks = content.strip().split("\n\n")  # Split by blank lines (trajectories)
    all_trajectories = []

    for block in blocks:
        lines = block.strip().split("\n")
        trajectory = []

        for line in lines:
            actions = line.strip().split("-")
            timestep = []

            for action in actions:
                # Updated regex to support float durations
                match = re.match(r"r(\d+)\((\d+(?:\.\d+)?)\):p(\d+)\(([^)]+)\),p(\d+)\(([^)]+)\)", action.strip())
                if match:
                    r_id, duration_str, p_start, y_start_str, p_end, y_end_str = match.groups()

                    # Convert duration to float
                    duration = float(duration_str)
                    y_start = [int(label[1:]) for label in y_start_str.split(',')]
                    y_end = [int(label[1:]) for label in y_end_str.split(',')]

                    timestep.append({
                        "robot_id": int(r_id),
                        "duration": duration,
                        "p_start": int(p_start),
                        "y_start": y_start,
                        "p_end": int(p_end),
                        "y_end": y_end
                    })
                else:
                    print(f"Warning: Could not parse action: {action}")
            trajectory.append(timestep)

        all_trajectories.append(trajectory)

    return all_trajectories

# Parse the data from the log file
parsed_data = parse_trajectories("your_path.txt")

# Print the parsed data for the first timestep in the first trajectory
print(parsed_data[0][0])  # first timestep in that trajectory

In [None]:
def find_long_trajectories(file_path, max_timesteps=10):
    """
    Reads a .txt file of trajectories and returns a list of indices of those
    trajectories that have more than `max_timesteps` timesteps.

    Parameters:
        file_path (str): Path to the trajectory file.
        max_timesteps (int): Threshold for maximum allowed timesteps.

    Returns:
        List[int]: Indices of long (spurious) trajectories.
    """
    with open(file_path, 'r') as f:
        content = f.read()

    blocks = content.strip().split("\n\n")
    long_traj_ids = []

    for i, block in enumerate(blocks):
        lines = block.strip().split("\n")
        if len(lines) > max_timesteps:
            long_traj_ids.append(i)

    return long_traj_ids

file_path = "your_path.txt"
long_ids_dataset = find_long_trajectories(file_path, max_timesteps)
print("Dataset too long trajectories: ", len(long_ids_dataset))

In [None]:
# Compute max ids from the parsed data
robot_ids = set()
place_ids = set()

for traj in parsed_data:
    for timestep in traj:
        for action in timestep:
            robot_ids.add(action['robot_id'])
            place_ids.add(action['p_start'])
            place_ids.add(action['p_end'])

max_robot_id = max(robot_ids) + 1
max_place_id = max(place_ids) + 1
max_timesteps = max(len(traj) for traj in parsed_data)

# Then initialize your embeddings
embed_dim = 32
robot_embedding_matrix = nn.Embedding(max_robot_id, embed_dim)
place_embedding_matrix = nn.Embedding(max_place_id, embed_dim)
timestep_embedding_matrix = nn.Embedding(max_timesteps, embed_dim)

# If you want to just use tensors here, extract their weight data:
robot_embeddings = robot_embedding_matrix.weight.data  # shape (max_robot_id, embed_dim)
place_embeddings = place_embedding_matrix.weight.data  # shape (max_place_id, embed_dim)

def sinusoidal_embedding(value, dim=32):
    """Sinusoidal embedding for a scalar value."""
    pe = torch.zeros(dim)
    position = torch.tensor(value, dtype=torch.float32)
    div_term = torch.exp(torch.arange(0, dim, 2).float() * -(np.log(10000.0) / dim))
    pe[0::2] = torch.sin(position * div_term)
    pe[1::2] = torch.cos(position * div_term)
    return pe

def time_embedding(duration, max_duration):
    """Sinusoidal encoding for duration using 2D angle."""
    normalized = np.log(duration + 1) / np.log(max_duration + 1)
    angle = 2 * np.pi * normalized
    return torch.tensor([np.sin(angle), np.cos(angle)], dtype=torch.float32)

def label_embedding(y_start, y_end, num_labels=5):
    """Label transition encoding: -1 for origin labels, 1 for destination labels, 0 otherwise."""
    vec = torch.zeros(num_labels)
    for i in y_start:
        vec[i - 1] = -1
    for i in y_end:
        if vec[i - 1] == -1:
            vec[i - 1] = 0  # neutralize if label is both in start and end
        else:
            vec[i - 1] = 1
    return vec

def timestep_embedding(position, dim=32):
    """Sinusoidal positional encoding for timestep index."""
    pe = torch.zeros(dim)
    position = torch.tensor(position, dtype=torch.float32)
    div_term = torch.exp(torch.arange(0, dim, 2).float() * -(np.log(10000.0) / dim))
    pe[0::2] = torch.sin(position * div_term)
    pe[1::2] = torch.cos(position * div_term)
    return pe

#----------------------------------------  MODIFY THE ENCODE ACTIONS FUNCTION AS PLEASED  ---------------------------------------------------------

def encode_actions(parsed_data, robot_embeddings, place_embeddings, timestep_embeddings, num_labels=5, embed_dim=32):
    max_duration = max(
        action['duration']
        for trajectory in parsed_data
        for timestep in trajectory
        for action in timestep
    )
    
    encoded_vectors = []

    for trajectory in parsed_data:
        encoded_trajectory = []

        for t_idx, timestep in enumerate(trajectory):
            timestep_vec = timestep_embeddings[t_idx]

            encoded_timestep = []

            for action in timestep:
                # Instead of sinusoidal, do lookup here:
                robot_vec = robot_embeddings[action['robot_id']]  
                p_start_vec = place_embeddings[action['p_start']]
                p_end_vec = place_embeddings[action['p_end']]

                label_vec = label_embedding(action['y_start'], action['y_end'], num_labels=num_labels)
                time_vec = time_embedding(action['duration'], max_duration)

                full_vec = torch.cat([
                    robot_vec,
                    p_start_vec,
                    p_end_vec,
                    label_vec,
                    time_vec,
                    timestep_vec
                ])

                encoded_timestep.append(full_vec)
            encoded_trajectory.append(encoded_timestep)

        encoded_vectors.append(encoded_trajectory)

    return encoded_vectors

In [None]:
# Parse the data from the log file
data = parse_trajectories("your_path.txt")
test = parse_trajectories("your_path.txt")

# Set the number of labels manually
num_labels = 10  # Change this based on the number of labels you want

# Example: create embedding matrices (trainable parameters should be managed separately during training)
robot_embedding_matrix = nn.Embedding(max_robot_id, embed_dim)
place_embedding_matrix = nn.Embedding(max_place_id, embed_dim)

# Pass the weight tensors:
data_trajectories = encode_actions(
    data,
    robot_embedding_matrix.weight.data,
    place_embedding_matrix.weight.data,
    timestep_embeddings=timestep_embedding_matrix.weight.data,
    num_labels=num_labels,
    embed_dim=embed_dim
)

test_trajectories = encode_actions(
    test,
    robot_embedding_matrix.weight.data,
    place_embedding_matrix.weight.data,
    timestep_embeddings=timestep_embedding_matrix.weight.data,
    num_labels=num_labels,
    embed_dim=embed_dim
)

# Print the encoded vector for the first timestep in the first trajectory
print(data_trajectories[0][0][0])  # First timestep in the first trajectory
print(f"Vector size: {test_trajectories[0][0][0].shape}")  # Expected size

In [None]:
def organize_trajectories_flat(encoded_vectors):
    """
    Convert each trajectory into a flat sequence of action vectors.
    Returns a list of tensors: each tensor is (num_actions, 137)
    """
    flat_trajectories = []

    for traj in encoded_vectors:
        actions = []
        for timestep in traj:
            actions.extend(timestep)  # Flatten timestep
        traj_tensor = torch.stack(actions)  # Shape: (num_actions, 137)
        flat_trajectories.append(traj_tensor)

    return flat_trajectories

data_vector = organize_trajectories_flat(data_trajectories)
test_vector = organize_trajectories_flat(test_trajetories)
# Inspect
print(flat_trajectories[0].shape)  # Should be (num_actions_in_first_traj, feature_dim)