In [1]:
import preamble
import torch.nn as nn

class ConvAutoencoder(nn.Module):
    def __init__(self):
        super(ConvAutoencoder, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv1d(4, 4, kernel_size=5, stride=2, padding=2),  # (8, 128)
            nn.ReLU(),
            nn.Conv1d(4, 8, kernel_size=5, stride=2, padding=2), # (16, 64)
            nn.ReLU(),
            nn.Conv1d(8, 16, kernel_size=5, stride=2, padding=2), # (32, 32)
            nn.ReLU(),
            nn.Conv1d(16, 32, kernel_size=5, stride=2, padding=2), # (64, 16)
            nn.ReLU(),
            nn.Conv1d(32, 64, kernel_size=5, stride=2, padding=2), # (128, 8)
            nn.ReLU(),
            nn.Conv1d(64, 128, kernel_size=5, stride=2, padding=2) # (256, 4)
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(128, 64, kernel_size=5, stride=2, padding=2, output_padding=1), # (128, 8)
            nn.ReLU(),
            nn.ConvTranspose1d(64, 32, kernel_size=5, stride=2, padding=2, output_padding=1), # (64, 16)
            nn.ReLU(),
            nn.ConvTranspose1d(32, 16, kernel_size=5, stride=2, padding=2, output_padding=1), # (32, 32)
            nn.ReLU(),
            nn.ConvTranspose1d(16, 8, kernel_size=5, stride=2, padding=2, output_padding=1), # (16, 64)
            nn.ReLU(),
            nn.ConvTranspose1d(8, 4, kernel_size=5, stride=2, padding=2, output_padding=1), # (8, 128)
            nn.ReLU(),
            nn.ConvTranspose1d(4, 4, kernel_size=5, stride=2, padding=2, output_padding=1)  # (4, 256)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

In [None]:
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader
from multimodal_pairs import MultimodalPairsDataset, MultimodalPairsSampler

# Prepare the dataset and data loader
data_dir = '../data/global_standardized_and_absolute_time_channel_multimodal_pairs'
dataset = MultimodalPairsDataset(data_dir)
sampler = MultimodalPairsSampler(
    dataset=dataset,
    no_action_prob=None, # 0.33
    num_samples_per_epoch=None # 620  # Specify samples per epoch
)

# Create DataLoader using the custom sampler
data_loader = DataLoader(
    dataset,
    batch_size=620,
    sampler=sampler
)

# Initialize the model, loss, and optimizer
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
model = ConvAutoencoder().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 500
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0

    for forces, _ in data_loader:
        forces = forces.to(device)

        # Forward pass
        outputs = model(forces)
        loss = criterion(outputs, forces)  # Reconstruction loss

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(data_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

print("Training complete!")

In [None]:
import matplotlib.pyplot as plt
import random
import torch

def plot_reconstructed_force(model, dataset, no_action_prob=0.5):
    """
    Plots the original and reconstructed force tensor for a randomly sampled multimodal pair,
    with control over the probability of sampling a no_action pair.

    Args:
        model: Trained autoencoder model.
        dataset: The dataset containing multimodal pairs.
        no_action_prob (float): Probability of selecting a no_action pair (default=0.5).
    """
    # Separate indices into action and no_action
    action_indices = [i for i, path in enumerate(dataset.file_paths) if "no_action" not in path]
    no_action_indices = [i for i, path in enumerate(dataset.file_paths) if "no_action" in path]

    # Sample based on probability
    if random.random() < no_action_prob:
        idx = random.choice(no_action_indices)  # Sample no_action pair
    else:
        idx = random.choice(action_indices)  # Sample action pair

    # Load the selected sample
    force_tensor, _ = dataset[idx]  # Original data
    force_tensor = force_tensor.unsqueeze(0).to(device)  # Add batch dim for model input

    # Pass through the model
    model.eval()
    with torch.no_grad():
        reconstructed = model(force_tensor).squeeze(0).cpu()  # Remove batch dim

    # Plot original and reconstructed forces
    time_steps = range(force_tensor.shape[2])  # Assuming length 256

    # Create 2 subplots: one for force components (1-3) and one for timestamps (0)
    fig, axs = plt.subplots(2, 1, figsize=(15, 10), gridspec_kw={'height_ratios': [3, 1]})

    # ----- Plot Force Components (1, 2, 3) -----
    colors = ['r', 'g', 'b']  # Colors for components 1, 2, 3
    for i in range(1, 4):  # Channels 1, 2, 3
        axs[0].plot(time_steps, force_tensor[0, i].cpu().numpy(), label=f'Original Component {i}', color=colors[i-1])
        axs[0].plot(time_steps, reconstructed[i].numpy(), label=f'Reconstructed Component {i}', color=colors[i-1], linestyle='--')

    axs[0].set_ylabel('Force Components')
    axs[0].set_title('Original vs Reconstructed Force Components')
    axs[0].legend()
    axs[0].grid(True)

    # ----- Plot Timestamps (0th Channel) -----
    axs[1].plot(time_steps, force_tensor[0, 0].cpu().numpy(), label='Original Timestamp', color='purple')
    axs[1].plot(time_steps, reconstructed[0].numpy(), label='Reconstructed Timestamp', color='purple', linestyle='--')

    axs[1].set_xlabel('Time Steps')
    axs[1].set_ylabel('Time (Channel 0)')
    axs[1].set_title('Original vs Reconstructed Timestamps')
    axs[1].legend()
    axs[1].grid(True)

    # Adjust layout and display
    plt.tight_layout()
    plt.show()

plot_reconstructed_force(model, dataset, no_action_prob=sampler.no_action_prob)

In [None]:
import torch
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import numpy as np

def extract_latent_embeddings(model, dataset, device):
    """
    Extracts latent embeddings and their corresponding labels from the dataset.
    
    Args:
        model: Trained CAE model.
        dataset: MultimodalPairsDataset.
        device: Device (e.g., 'cuda' or 'cpu').

    Returns:
        embeddings: Latent embeddings (numpy array).
        labels: Corresponding labels (list of phrases).
    """
    model.eval()
    embeddings = []
    labels = []

    with torch.no_grad():
        for i in range(len(dataset)):
            # Load data
            force_tensor, phrase_binary = dataset[i]
            force_tensor = force_tensor.unsqueeze(0).to(device)  # Add batch dimension
            
            # Get latent embedding
            latent = model.encoder(force_tensor)
            embeddings.append(latent.cpu().numpy().flatten())  # Flatten to 1D vector
            
            # Get label
            labels.append(phrase_binary.cpu().numpy())

    return np.array(embeddings), np.array(labels)

def plot_latent_space_cartesian_direction(embeddings, labels, perplexity=30):
    """
    Visualizes latent embeddings using t-SNE with colors based on the first Cartesian direction.

    Args:
        embeddings: Numpy array of latent embeddings.
        labels: Numpy array of binary multi-labels.
        perplexity: Perplexity parameter for t-SNE (default=30).
    """
    # Extract the first Cartesian direction section (index range for that part of the label)
    first_cartesian_start = 13  # Start of the first cartesian direction section
    first_cartesian_end = 20   # End of the first cartesian direction section

    # Extract the section and get class IDs
    first_cartesian_labels = labels[:, first_cartesian_start:first_cartesian_end]
    class_ids = np.argmax(first_cartesian_labels, axis=1)  # Treat as one-hot encoded
    # print([class_ids[i] for i in range(len(class_ids))])

    # Reduce dimensionality to 2D using t-SNE
    tsne = TSNE(n_components=2, perplexity=perplexity, random_state=42)
    reduced_embeddings = tsne.fit_transform(np.array(embeddings))

    # Plot the embeddings with colors based on the first Cartesian direction
    plt.figure(figsize=(10, 8))
    scatter = plt.scatter(reduced_embeddings[:, 0], reduced_embeddings[:, 1], 
                          c=class_ids, cmap=plt.cm.get_cmap('tab10', 7), alpha=0.7)  # Use tab10 colormap for 7 classes
    plt.colorbar(scatter, ticks=range(7), label="First Cartesian Direction")
    plt.title("t-SNE Visualization of Latent Space (First Cartesian Direction)")
    plt.xlabel("Dimension 1")
    plt.ylabel("Dimension 2")
    plt.grid(True)
    plt.show()

embeddings, labels = extract_latent_embeddings(model, dataset, device)
plot_latent_space_cartesian_direction(embeddings, labels, perplexity=30)
# ['', 'backward', 'down', 'forward', 'left', 'right', 'up']

In [49]:
import torch
import torch.nn as nn
import torch.optim as optim

class Autoencoder(nn.Module):
    def __init__(self, latent_dim=16):
        super(Autoencoder, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Flatten(),  # Flatten input (4, 256) -> 1024
            nn.Linear(1024, 512),   # Compress to 512
            nn.ReLU(),
            nn.Linear(512, 128),   # Compress to 128
            nn.ReLU(),
            nn.Linear(128, latent_dim)  # Compress to latent_dim (default=16)
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 128),  # Expand to 128
            nn.ReLU(),
            nn.Linear(128, 512),        # Expand to 512
            nn.ReLU(),
            nn.Linear(512, 1024),       # Expand back to 1024
            nn.Unflatten(1, (4, 256))   # Reshape to (4, 256)
        )
    
    def forward(self, x):
        latent = self.encoder(x)   # Compressed latent representation
        reconstructed = self.decoder(latent)  # Reconstructed input
        return reconstructed, latent  # Return both output and latent

In [50]:
import torch.nn.functional as F
from torch.utils.data import DataLoader

def train_autoencoder(model, dataloader, epochs=20, lr=0.001, device='cuda'):
    """
    Trains the autoencoder model.

    Args:
        model: Autoencoder model.
        dataloader: DataLoader for training data.
        epochs: Number of training epochs.
        lr: Learning rate.
        device: Device (e.g., 'cuda' or 'cpu').
    """
    # Define optimizer and loss function
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.MSELoss()

    # Move model to device
    model.to(device)

    # Training loop
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0

        for batch in dataloader:
            # Get inputs
            inputs, _ = batch  # Ignore labels for reconstruction task
            inputs = inputs.to(device)

            # Forward pass
            outputs, _ = model(inputs)  # Reconstructed output
            loss = criterion(outputs, inputs)  # MSE loss for reconstruction

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        # Log progress
        avg_loss = running_loss / len(dataloader)
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.6f}")

In [None]:
from torch.utils.data import DataLoader, TensorDataset

# Prepare the dataset and data loader
data_dir = '../data/global_standardized_and_absolute_time_channel_multimodal_pairs'
dataset = MultimodalPairsDataset(data_dir)
sampler = MultimodalPairsSampler(
    dataset=dataset,
    no_action_prob=None, # 0.33
    num_samples_per_epoch=None # 620  # Specify samples per epoch
)

# Create DataLoader using the custom sampler
data_loader = DataLoader(
    dataset,
    batch_size=620,
    sampler=sampler
)

# Model setup
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
model = Autoencoder(latent_dim=16)

# Train the model
train_autoencoder(model, data_loader, epochs=500, lr=0.001, device=device)

In [None]:
model.eval()
latent_embeddings = []
labels = []

with torch.no_grad():
    for batch in data_loader:
        inputs, label = batch
        inputs = inputs.to(device)
        
        # Extract latent embeddings
        _, latent = model(inputs)
        latent_embeddings.append(latent.cpu().numpy())
        labels.append(label.cpu().numpy())

# Combine embeddings and labels
latent_embeddings = np.vstack(latent_embeddings)
labels = np.vstack(labels)

plot_latent_space_cartesian_direction(latent_embeddings, labels, perplexity=30)

In [None]:
import matplotlib.pyplot as plt
import random
import torch

def plot_reconstructed_force_again(model, dataset, no_action_prob=0.5):
    """
    Plots the original and reconstructed force tensor for a randomly sampled multimodal pair,
    with control over the probability of sampling a no_action pair.

    Args:
        model: Trained autoencoder model.
        dataset: The dataset containing multimodal pairs.
        no_action_prob (float): Probability of selecting a no_action pair (default=0.5).
    """
    # Separate indices into action and no_action
    action_indices = [i for i, path in enumerate(dataset.file_paths) if "no_action" not in path]
    no_action_indices = [i for i, path in enumerate(dataset.file_paths) if "no_action" in path]

    # Sample based on probability
    if random.random() < no_action_prob:
        idx = random.choice(no_action_indices)  # Sample no_action pair
    else:
        idx = random.choice(action_indices)  # Sample action pair

    # Load the selected sample
    force_tensor, _ = dataset[idx]  # Original data
    force_tensor = force_tensor.unsqueeze(0).to(device)  # Add batch dim for model input

    # Pass through the model
    model.eval()
    with torch.no_grad():
        reconstructed = model(force_tensor)[0].squeeze(0).cpu()  # Remove batch dim

    # Plot original and reconstructed forces
    time_steps = range(force_tensor.shape[2])  # Assuming length 256

    # Create 2 subplots: one for force components (1-3) and one for timestamps (0)
    fig, axs = plt.subplots(2, 1, figsize=(15, 10), gridspec_kw={'height_ratios': [3, 1]})

    # ----- Plot Force Components (1, 2, 3) -----
    colors = ['r', 'g', 'b']  # Colors for components 1, 2, 3
    for i in range(1, 4):  # Channels 1, 2, 3
        axs[0].plot(time_steps, force_tensor[0, i].cpu().numpy(), label=f'Original Component {i}', color=colors[i-1])
        axs[0].plot(time_steps, reconstructed[i].numpy(), label=f'Reconstructed Component {i}', color=colors[i-1], linestyle='--')

    axs[0].set_ylabel('Force Components')
    axs[0].set_title('Original vs Reconstructed Force Components')
    axs[0].legend()
    axs[0].grid(True)

    # ----- Plot Timestamps (0th Channel) -----
    axs[1].plot(time_steps, force_tensor[0, 0].cpu().numpy(), label='Original Timestamp', color='purple')
    axs[1].plot(time_steps, reconstructed[0].numpy(), label='Reconstructed Timestamp', color='purple', linestyle='--')

    axs[1].set_xlabel('Time Steps')
    axs[1].set_ylabel('Time (Channel 0)')
    axs[1].set_title('Original vs Reconstructed Timestamps')
    axs[1].legend()
    axs[1].grid(True)

    # Adjust layout and display
    plt.tight_layout()
    plt.show()

plot_reconstructed_force_again(model, dataset, no_action_prob=sampler.no_action_prob)