# Time Displacement Model Training

This notebook trains an XGBoost model to predict time displacement (timing humanization)
using cluster-based quantization. The target variable `time_offset` represents how much
earlier or later a note is played relative to its cluster centroid.

Key insight: Notes in multi-note clusters (chords) provide reliable training data because
the cluster centroid represents the "intended" beat position.

In [None]:
import os
import pickle
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import xgboost

from midi_to_df_conversion import midi_files_to_df
from midi_utility import get_midi_filepaths

In [None]:
# Configuration
save_model = False  # Set to True to save the trained model
model_cache_path = Path("model_cache")
time_displacement_model_path = model_cache_path / "time_displacement.json"
time_displacement_scaler_path = model_cache_path / "time_displacement_scaler.pkl"

## Data Loading

Load real MIDI data from the cache and extract time displacement features.

In [None]:
# Load MIDI files
midi_dir = Path("midi_data_repaired_cache")
midi_files = get_midi_filepaths(midi_dir)
print(f"Found {len(midi_files)} MIDI files")

# Convert to dataframe with time displacement features
df = midi_files_to_df(midi_files, skip_suspicious=True, include_time_displacement=True)
print(f"Total notes: {len(df)}")
df.head()

In [None]:
# Explore the time displacement distribution
print("Time offset statistics (all notes):")
print(df["time_offset"].describe())

print("\nTime offset statistics (multi-note clusters only):")
multi_cluster_df = df[df["in_multi_cluster"] == 1]
print(multi_cluster_df["time_offset"].describe())

print(f"\nNotes in multi-note clusters: {len(multi_cluster_df)} ({100*len(multi_cluster_df)/len(df):.1f}%)")

In [None]:
# Plot time offset distributions
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# All notes
ax1 = axes[0]
offsets = df["time_offset"].values
bins = range(int(offsets.min()) - 1, int(offsets.max()) + 2)
ax1.hist(offsets, bins=min(100, len(bins)), edgecolor='black', alpha=0.7)
ax1.axvline(x=0, color='red', linestyle='--', linewidth=2)
ax1.set_xlabel('Time offset (ticks)')
ax1.set_ylabel('Count')
ax1.set_title(f'All notes (n={len(offsets)})\nstd={offsets.std():.1f}')

# Multi-note clusters only
ax2 = axes[1]
multi_offsets = multi_cluster_df["time_offset"].values
bins = range(int(multi_offsets.min()) - 1, int(multi_offsets.max()) + 2)
ax2.hist(multi_offsets, bins=min(100, len(bins)), edgecolor='black', alpha=0.7, color='green')
ax2.axvline(x=0, color='red', linestyle='--', linewidth=2)
ax2.set_xlabel('Time offset (ticks)')
ax2.set_ylabel('Count')
ax2.set_title(f'Multi-note clusters (n={len(multi_offsets)})\nstd={multi_offsets.std():.1f}')

plt.tight_layout()
plt.show()

## Train/Test Split

Split by song to prevent data leakage.

In [None]:
# Split by song name to prevent leakage
song_names = df["name"].unique()
np.random.seed(42)
np.random.shuffle(song_names)

split_idx = int(len(song_names) * 0.8)
train_songs = song_names[:split_idx]
test_songs = song_names[split_idx:]

train_df = df[df["name"].isin(train_songs)].copy()
test_df = df[df["name"].isin(test_songs)].copy()

print(f"Train songs: {len(train_songs)}, Test songs: {len(test_songs)}")
print(f"Train notes: {len(train_df)}, Test notes: {len(test_df)}")

In [None]:
# Option: Train only on multi-note clusters for more reliable targets
# Uncomment to enable:
# train_df = train_df[train_df["in_multi_cluster"] == 1].copy()
# test_df = test_df[test_df["in_multi_cluster"] == 1].copy()
# print(f"After filtering: Train={len(train_df)}, Test={len(test_df)}")

## Feature Preparation

In [None]:
# Columns to drop (not features)
drop_cols = [
    "midi_track_index", "midi_event_index", "name", "time",
    "velocity",  # Don't use velocity as input (will be predicted separately)
    "time_offset",  # Target variable
]

# Keep only columns that exist in the dataframe
drop_cols = [c for c in drop_cols if c in train_df.columns]

# Identify categorical and continuous columns
cat_cols = [
    col for col in train_df.columns
    if not pd.api.types.is_numeric_dtype(train_df[col]) and col not in drop_cols
]
cont_cols = [
    col for col in train_df.columns
    if pd.api.types.is_numeric_dtype(train_df[col]) and col not in drop_cols + ["time_offset"]
]

print(f"Categorical features: {len(cat_cols)}")
print(cat_cols)
print(f"\nContinuous features: {len(cont_cols)}")

In [None]:
# Scale continuous features and target
scaler = StandardScaler()
scaler.fit(train_df[cont_cols + ["time_offset"]])

train_df[cont_cols + ["time_offset"]] = scaler.transform(train_df[cont_cols + ["time_offset"]])
test_df[cont_cols + ["time_offset"]] = scaler.transform(test_df[cont_cols + ["time_offset"]])

print("Scaler fitted. Mean of first 5 features:", scaler.mean_[:5])

In [None]:
if save_model:
    os.makedirs(model_cache_path, exist_ok=True)
    with open(time_displacement_scaler_path, "wb") as f:
        pickle.dump(scaler, f)
    print(f"Saved scaler to {time_displacement_scaler_path}")

In [None]:
# Convert categorical columns
for col in cat_cols:
    train_df[col] = train_df[col].astype("category")
    test_df[col] = test_df[col].astype("category")

In [None]:
# Prepare X and y
feature_cols = cat_cols + cont_cols

X_train = train_df[feature_cols]
y_train = train_df["time_offset"]
X_test = test_df[feature_cols]
y_test = test_df["time_offset"]

print(f"X_train shape: {X_train.shape}")
print(f"X_test shape: {X_test.shape}")
X_train.head()

## Model Training

In [None]:
# Train XGBoost model
model = xgboost.XGBRegressor(
    booster="gbtree",
    max_depth=5,
    learning_rate=0.1,
    n_estimators=2000,
    gamma=0.5,
    min_child_weight=10,
    subsample=0.7,
    colsample_bytree=0.7,
    reg_alpha=0.5,
    reg_lambda=0.5,
    n_jobs=8,
    enable_categorical=True,
    early_stopping_rounds=20,
)

model.fit(
    X_train, y_train,
    eval_set=[(X_train, y_train), (X_test, y_test)],
    verbose=True
)

## Model Evaluation

In [None]:
# Make predictions
train_preds = model.predict(X_train)
test_preds = model.predict(X_test)

# Calculate metrics (in scaled space)
train_rmse = mean_squared_error(y_train, train_preds, squared=False)
test_rmse = mean_squared_error(y_test, test_preds, squared=False)
train_mae = mean_absolute_error(y_train, train_preds)
test_mae = mean_absolute_error(y_test, test_preds)

print(f"Train RMSE: {train_rmse:.4f}, MAE: {train_mae:.4f}")
print(f"Test RMSE: {test_rmse:.4f}, MAE: {test_mae:.4f}")

In [None]:
# Get time_offset index in the scaler (it's the last column)
time_offset_idx = len(cont_cols)  # time_offset is appended after cont_cols
time_offset_std = np.sqrt(scaler.var_[time_offset_idx])
time_offset_mean = scaler.mean_[time_offset_idx]

# Convert metrics to original scale (ticks)
print(f"\nIn original scale (MIDI ticks):")
print(f"Train RMSE: {train_rmse * time_offset_std:.2f} ticks")
print(f"Test RMSE: {test_rmse * time_offset_std:.2f} ticks")
print(f"Train MAE: {train_mae * time_offset_std:.2f} ticks")
print(f"Test MAE: {test_mae * time_offset_std:.2f} ticks")

In [None]:
# Plot training history
results = model.evals_result()
plt.figure(figsize=(10, 5))
plt.plot(results["validation_0"]["rmse"], label="train")
plt.plot(results["validation_1"]["rmse"], label="test")
plt.xlabel("Iteration")
plt.ylabel("RMSE")
plt.title("Training History")
plt.legend()
plt.show()

In [None]:
# Plot actual vs predicted
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Sample for plotting
sample_idx = np.random.choice(len(y_test), min(1000, len(y_test)), replace=False)

ax1 = axes[0]
ax1.scatter(y_test.iloc[sample_idx], test_preds[sample_idx], alpha=0.3)
ax1.plot([-3, 3], [-3, 3], 'r--', label='Perfect prediction')
ax1.set_xlabel('Actual time_offset (scaled)')
ax1.set_ylabel('Predicted time_offset (scaled)')
ax1.set_title('Actual vs Predicted (Test Set)')
ax1.legend()

# Residual distribution
ax2 = axes[1]
residuals = test_preds - y_test.values
ax2.hist(residuals, bins=50, edgecolor='black', alpha=0.7)
ax2.axvline(x=0, color='red', linestyle='--')
ax2.set_xlabel('Residual (predicted - actual)')
ax2.set_ylabel('Count')
ax2.set_title(f'Residual Distribution\nmean={residuals.mean():.3f}, std={residuals.std():.3f}')

plt.tight_layout()
plt.show()

In [None]:
# Feature importance
importance_map = dict(zip(feature_cols, model.feature_importances_))
sorted_importance = sorted(importance_map.items(), key=lambda x: x[1], reverse=True)

print("Top 20 most important features:")
for name, importance in sorted_importance[:20]:
    print(f"  {name}: {importance:.4f}")

In [None]:
# Plot feature importance
top_n = 20
names = [x[0] for x in sorted_importance[:top_n]]
values = [x[1] for x in sorted_importance[:top_n]]

plt.figure(figsize=(10, 8))
plt.barh(range(len(names)), values[::-1])
plt.yticks(range(len(names)), names[::-1])
plt.xlabel('Feature Importance')
plt.title(f'Top {top_n} Feature Importances')
plt.tight_layout()
plt.show()

## Save Model

In [None]:
if save_model:
    model.save_model(time_displacement_model_path)
    print(f"Saved model to {time_displacement_model_path}")
else:
    print("Model not saved. Set save_model=True to save.")

In [None]:
# Final summary
print("=" * 60)
print("TIME DISPLACEMENT MODEL SUMMARY")
print("=" * 60)
print(f"Training samples: {len(X_train)}")
print(f"Test samples: {len(X_test)}")
print(f"Features: {len(feature_cols)}")
print(f"Test RMSE: {test_rmse:.4f} (scaled), {test_rmse * time_offset_std:.2f} ticks")
print(f"Test MAE: {test_mae:.4f} (scaled), {test_mae * time_offset_std:.2f} ticks")
print(f"\nTop 5 features: {[x[0] for x in sorted_importance[:5]]}")