In [1]:
# %% [markdown]
# # Pirate Pain â€” Leakage-Safe Baseline (Anti-Overfitting)
# - Aggregate each `sample_index` time series into compact features (no mixing between series).
# - Do **Stratified K-Fold** on the per-series table (avoids leakage across timesteps).
# - Use **StandardScaler + LogisticRegression (multinomial)** with strong L2 regularization.
# - Score with **macro-F1** and export `submission_baseline.csv` (columns: sample_index,label).

# %%
import numpy as np, pandas as pd
from pathlib import Path
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import StratifiedKFold, GridSearchCV
from sklearn.metrics import f1_score, classification_report, confusion_matrix
import warnings
warnings.filterwarnings("ignore")

SEED = 42
rng = np.random.default_rng(SEED)
DATA_DIR = Path(".")  # change if needed

TRAIN = DATA_DIR / "pirate_pain_train.csv"
TRAIN_Y = DATA_DIR / "pirate_pain_train_labels.csv"
TEST  = DATA_DIR / "pirate_pain_test.csv"

X_train = pd.read_csv(TRAIN)
y_train = pd.read_csv(TRAIN_Y)
X_test  = pd.read_csv(TEST)

print("Train (per-timestep):", X_train.shape)
print("Labels:", y_train.shape)
print("Test (per-timestep):", X_test.shape)

assert {'sample_index','time'}.issubset(X_train.columns)
assert {'sample_index','time'}.issubset(X_test.columns)
assert set(y_train.columns) == {'sample_index','label'}

print("Label counts (per series):")
print(y_train['label'].value_counts())


Train (per-timestep): (105760, 40)
Labels: (661, 2)
Test (per-timestep): (211840, 40)
Label counts (per series):
label
no_pain      511
low_pain      94
high_pain     56
Name: count, dtype: int64


In [2]:
# %% [markdown]
# ## Feature engineering (per series)
# We compute robust statistics per numeric channel, plus a slope vs time and energy term.
# This keeps the model small and reduces overfitting risk.

# %%
# numeric columns except ids
num_cols = [c for c in X_train.columns if c not in ['sample_index', 'time']]
num_cols = [c for c in num_cols if pd.api.types.is_numeric_dtype(X_train[c])]
print("n numeric channels:", len(num_cols))

def _safe_quantile(x, q):
    try:
        return np.nanquantile(x, q)
    except Exception:
        return np.nan

def _slope_vs_time(x, t):
    x = np.asarray(x, float); t = np.asarray(t, float)
    m = np.isfinite(x) & np.isfinite(t)
    if m.sum() < 3 or np.all(t[m] == t[m][0]): return 0.0
    tt = t[m] - t[m].mean()
    denom = np.dot(tt, tt)
    if denom == 0: return 0.0
    return float(np.dot(tt, x[m] - x[m].mean()) / denom)

def build_series_table(df: pd.DataFrame) -> pd.DataFrame:
    rows = []
    for sid, g in df.groupby('sample_index', sort=False):
        row = {'sample_index': sid}
        t = g['time'].values
        dur = float(np.nanmax(t) - np.nanmin(t)) if len(t) else 0.0
        row['n_steps'] = int(len(g))
        row['duration'] = dur
        row['rate_est'] = row['n_steps'] / dur if dur > 0 else row['n_steps']
        for c in num_cols:
            x = g[c].values
            row[f'{c}_mean']   = float(np.nanmean(x))
            row[f'{c}_std']    = float(np.nanstd(x))
            row[f'{c}_min']    = float(np.nanmin(x))
            row[f'{c}_max']    = float(np.nanmax(x))
            row[f'{c}_med']    = float(np.nanmedian(x))
            row[f'{c}_q25']    = float(_safe_quantile(x, 0.25))
            row[f'{c}_q75']    = float(_safe_quantile(x, 0.75))
            row[f'{c}_slope']  = _slope_vs_time(x, t)
            row[f'{c}_energy'] = float(np.nanmean(np.square(x)))
        rows.append(row)
    return pd.DataFrame(rows).set_index('sample_index')

series_train = build_series_table(X_train)
series_test  = build_series_table(X_test)

# Align columns (defensive)
missing = [c for c in series_train.columns if c not in series_test.columns]
for c in missing: series_test[c] = 0.0
series_train = series_train.reindex(columns=list(series_test.columns), fill_value=0.0).sort_index()
series_test  = series_test.sort_index()

print("Per-series train:", series_train.shape, "Per-series test:", series_test.shape)


n numeric channels: 35
Per-series train: (661, 318) Per-series test: (1324, 318)


In [3]:
# %% [markdown]
# ## Labels and CV setup (per series)
# We perform **StratifiedKFold** on the aggregated table (one row per series).

# %%
le = LabelEncoder()
y = y_train.set_index('sample_index').loc[series_train.index, 'label']
y_enc = le.fit_transform(y)
print("Classes:", list(le.classes_))


Classes: ['high_pain', 'low_pain', 'no_pain']


In [4]:
# %% [markdown]
# ## Model: Scaler + Multinomial Logistic Regression (L2)
# We tune only `C` on a small grid (less variance, safer on small data).

# %%
pipe = Pipeline([
    ('scaler', StandardScaler(with_mean=True, with_std=True)),
    ('clf', LogisticRegression(
        multi_class='multinomial', solver='lbfgs',
        max_iter=2000, class_weight='balanced', random_state=SEED))
])

param_grid = {'clf__C': [0.05, 0.1, 0.2, 0.5, 1.0]}

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED)
search = GridSearchCV(pipe, param_grid, scoring='f1_macro', cv=cv, n_jobs=-1, verbose=1)
search.fit(series_train.values, y_enc)

print("Best params:", search.best_params_)
print("CV macro-F1:", search.best_score_)
best_model = search.best_estimator_


Fitting 5 folds for each of 5 candidates, totalling 25 fits
Best params: {'clf__C': 0.1}
CV macro-F1: 0.5850041501717831


In [5]:
# %% [markdown]
# ## Diagnostics on the training data (optional, just to sanity-check)
# (Report is computed on the same data used to refit the final model returned by GridSearchCV)

# %%
pred_train = best_model.predict(series_train.values)
print("Train macro-F1 (refit):", f1_score(y_enc, pred_train, average='macro'))
print(classification_report(y_enc, pred_train, target_names=list(le.classes_)))
print(confusion_matrix(y_enc, pred_train))


Train macro-F1 (refit): 0.8829674503316919
              precision    recall  f1-score   support

   high_pain       0.77      1.00      0.87        56
    low_pain       0.77      0.90      0.83        94
     no_pain       0.99      0.92      0.95       511

    accuracy                           0.92       661
   macro avg       0.84      0.94      0.88       661
weighted avg       0.94      0.92      0.93       661

[[ 56   0   0]
 [  2  85   7]
 [ 15  26 470]]


In [6]:
# %% [markdown]
# ## Train on full data & create submission
# The final model is trained on all per-series rows and applied to the per-series test table.

# %%
best_model.fit(series_train.values, y_enc)
test_pred = best_model.predict(series_test.values)
test_labels = le.inverse_transform(test_pred)

sub = pd.DataFrame({
    'sample_index': series_test.index.astype(str),
    'label': test_labels
})
sub.to_csv("submission_baseline.csv", index=False)
sub.head()


Unnamed: 0,sample_index,label
0,0,high_pain
1,1,high_pain
2,2,high_pain
3,3,high_pain
4,4,high_pain
