# Haptic Signal VAE — Colab Training

This notebook runs the full training pipeline on Google Colab.

**Data source:** Cloned directly from [HapticGen/hapticgen-dataset](https://github.com/HapticGen/hapticgen-dataset) — no Google Drive needed for data.

**Steps:**
1. Clone code repo + dataset repo from GitHub
2. Install dependencies
3. Run training
4. Evaluate and listen to results
5. (Optional) Save outputs to Google Drive

In [None]:
# 1. Clone code repo (or pull latest)
import os

REPO_URL = "https://github.com/cindy-77jiayi/thesis_hapticAE.git"
REPO_DIR = "/content/thesis_hapticAE"

if os.path.exists(REPO_DIR):
    !cd {REPO_DIR} && git pull
else:
    !git clone {REPO_URL} {REPO_DIR}

os.chdir(REPO_DIR)
print(f"Working directory: {os.getcwd()}")

In [None]:
# 2. Clone dataset directly from GitHub (no Google Drive needed)
DATASET_URL = "https://github.com/HapticGen/hapticgen-dataset.git"
DATASET_DIR = "/content/hapticgen-dataset"

if os.path.exists(DATASET_DIR):
    !cd {DATASET_DIR} && git pull
else:
    !git clone {DATASET_URL} {DATASET_DIR}

# Data path for training
DATA_DIR = os.path.join(DATASET_DIR, "expertvoted")
print(f"Dataset: {DATA_DIR}")
print(f"Files: {len(os.listdir(DATA_DIR)) if os.path.exists(DATA_DIR) else 'NOT FOUND'}")

In [None]:
# 3. Install dependencies
!pip install -q -r requirements.txt

In [None]:
# 4. Configure paths
OUTPUT_DIR = "/content/outputs"  # Colab local storage (fast)
CONFIG = "configs/vae_default.yaml"

os.makedirs(OUTPUT_DIR, exist_ok=True)

print(f"Data:   {DATA_DIR}")
print(f"Output: {OUTPUT_DIR}")
print(f"Config: {CONFIG}")

In [None]:
# 5. Run training
!python scripts/train.py --config {CONFIG} --data_dir {DATA_DIR} --output_dir {OUTPUT_DIR}

In [None]:
# 6. Evaluate: load results and visualize
import os, sys, glob

# Ensure repo is on Python path
REPO_DIR = "/content/thesis_hapticAE"
os.chdir(REPO_DIR)
if REPO_DIR not in sys.path:
    sys.path.insert(0, REPO_DIR)

import numpy as np
import torch
from torch.utils.data import DataLoader

from src.utils.config import load_config
from src.utils.seed import set_seed
from src.data.preprocessing import collect_clean_wavs, estimate_global_rms
from src.data.dataset import HapticWavDataset
from src.models.conv_vae import ConvVAE
from src.eval.evaluate import evaluate_reconstruction, print_metrics
from src.eval.visualize import plot_loss_curves, plot_waveform_comparison
from src.eval.audio import play_ab_comparison

CONFIG = "configs/vae_default.yaml"
OUTPUT_DIR = "/content/outputs"
DATA_DIR = "/content/hapticgen-dataset/expertvoted"

config = load_config(CONFIG)
set_seed(config['seed'])

# Find latest run
run_dirs = sorted(glob.glob(f"{OUTPUT_DIR}/*/best_model.pt"))
assert run_dirs, "No trained models found"
ckpt_path = run_dirs[-1]
run_dir = os.path.dirname(ckpt_path)
print(f"Using checkpoint: {ckpt_path}")

# Load metrics
metrics = np.load(os.path.join(run_dir, 'metrics.npz'))
plot_loss_curves(metrics['train_losses'].tolist(), metrics['val_losses'].tolist())

In [None]:
# 7. Reconstruction evaluation + audio
data_cfg = config['data']
wav_files = collect_clean_wavs(DATA_DIR)
N = len(wav_files)
perm = np.random.permutation(N)
split = int(data_cfg['train_split'] * N)
val_files = [wav_files[i] for i in perm[split:]]
train_files = [wav_files[i] for i in perm[:split]]
global_rms = estimate_global_rms(train_files, n=200, sr_expect=data_cfg['sr'])

val_ds = HapticWavDataset(val_files, T=data_cfg['T'], sr_expect=data_cfg['sr'], global_rms=global_rms, scale=data_cfg['scale'])
val_loader = DataLoader(val_ds, batch_size=32, shuffle=False)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_cfg = config['model']
model = ConvVAE(
    T=data_cfg['T'], latent_dim=model_cfg['latent_dim'],
    channels=tuple(model_cfg['channels']),
    first_kernel=model_cfg.get('first_kernel', 25),
    kernel_size=model_cfg.get('kernel_size', 9),
).to(device)
model.load_state_dict(torch.load(ckpt_path, map_location=device))
model.eval()

result = evaluate_reconstruction(model, val_loader, device, n_samples=10)
print_metrics(result)
plot_waveform_comparison(result['x_np'], result['xhat_np'])
play_ab_comparison(result['x_np'], result['xhat_np'], sr=data_cfg['sr'])

---
## PCA Control Pipeline

Extract latent vectors from the full dataset, fit PCA to get 8 interpretable control dimensions, and run sweep experiments.

In [None]:
# 8. Extract latent vectors + fit PCA
PCA_DIR = "/content/outputs/pca"

!python scripts/extract_and_pca.py \
    --config {CONFIG} \
    --data_dir {DATA_DIR} \
    --checkpoint {ckpt_path} \
    --output_dir {PCA_DIR} \
    --n_components 8

In [None]:
# 9. Single-axis sweep: visualize what each PC controls
import pickle
from src.pipelines.pca_control import single_axis_sweep, plot_sweep, play_sweep

with open(f"{PCA_DIR}/pca_pipe.pkl", "rb") as f:
    pipe = pickle.load(f)

# Sweep PC1 through PC4 (most important axes)
for ax in range(4):
    print(f"\n{'='*60}")
    print(f"Sweeping PC{ax+1} from -2 to +2")
    print(f"{'='*60}")
    result = single_axis_sweep(
        pipe, model, device,
        axis=ax, sweep_range=(-2.0, 2.0), n_steps=9,
        T=data_cfg['T'],
    )
    plot_sweep(result, sr=data_cfg['sr'])
    play_sweep(result, sr=data_cfg['sr'])

In [None]:
# 10. Inspect PCA: variance explained and control space statistics
Z = np.load(f"{PCA_DIR}/Z.npy")
Z_pca = np.load(f"{PCA_DIR}/Z_pca.npy")

import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

# Full PCA to see variance dropoff
from sklearn.preprocessing import StandardScaler
Z_scaled = StandardScaler().fit_transform(Z)
full_pca = PCA().fit(Z_scaled)

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 4))

# Scree plot
ax1.bar(range(1, len(full_pca.explained_variance_ratio_)+1), full_pca.explained_variance_ratio_)
ax1.axvline(x=8.5, color='r', linestyle='--', label='8 components')
ax1.set_xlabel('Principal Component')
ax1.set_ylabel('Explained Variance Ratio')
ax1.set_title('Scree Plot')
ax1.legend()

# Cumulative variance
cumvar = np.cumsum(full_pca.explained_variance_ratio_)
ax2.plot(range(1, len(cumvar)+1), cumvar, 'o-')
ax2.axhline(y=0.9, color='r', linestyle='--', label='90% threshold')
ax2.axvline(x=8, color='g', linestyle='--', label='8 components')
ax2.set_xlabel('Number of Components')
ax2.set_ylabel('Cumulative Explained Variance')
ax2.set_title('Cumulative Variance Explained')
ax2.legend()

plt.tight_layout()
plt.show()

print(f"\nVariance explained by 8 PCs: {cumvar[7]:.2%}")
print(f"Variance explained by 16 PCs: {cumvar[15]:.2%}")
print(f"Components needed for 90%: {np.argmax(cumvar >= 0.9) + 1}")

---
## (Optional) Save outputs to Google Drive

Colab local storage is wiped when the session ends. Run the cell below to copy your trained model and PCA results to Google Drive.

In [None]:
# Optional: save outputs to Google Drive for persistence
SAVE_TO_DRIVE = False  # Set to True if you want to save

if SAVE_TO_DRIVE:
    from google.colab import drive
    drive.mount('/content/drive')
    DRIVE_OUTPUT = "/content/drive/MyDrive/thesis/outputs"
    os.makedirs(DRIVE_OUTPUT, exist_ok=True)
    !cp -r {OUTPUT_DIR}/* {DRIVE_OUTPUT}/
    print(f"Saved to: {DRIVE_OUTPUT}")
else:
    print("Skipped. Set SAVE_TO_DRIVE = True to save to Google Drive.")