In [1]:
# -*- coding: utf-8 -*-
"""
Step 4: Build an encoding-ready lagged design matrix from TR-level GPT2 features.
- Input: Step 3 TR features X_tr with shape (N_TR, L, D) where D=50 (PCA dims)
- Output:
  1) X_lagged_4d: (N_eff, n_lags, L, D)
  2) X_lagged_flat: (N_eff, n_lags*L*D)
  3) metadata JSON storing lags, TR, and the TR indices kept
- Plots:
  - Coverage and kept TR indices
  - Lag energy by layer (for quick sanity checks)

All comments are in English as requested.
"""

import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# -----------------------------
# User config (EDIT THESE)
# -----------------------------
STEP3_DIR = r"E:\Nastase\encoding_features\pieman_step3"
OUT_DIR   = r"E:\Nastase\encoding_features\pieman_step4"

TR_SEC = 1.5

# Recommended lags for fMRI HRF delay with TR=1.5s:
# 1..8 TR corresponds to 1.5s..12s delay.
LAGS = list(range(1, 9))  # [1,2,3,4,5,6,7,8]

# TR filtering rule:
# We will keep only TRs with coverage>0 and with all required lagged TRs available.
REQUIRE_COVERAGE = True

FIG_DPI = 220


def main():
    os.makedirs(OUT_DIR, exist_ok=True)

    # -----------------------------
    # 1) Load TR-level features and TR summary
    # -----------------------------
    X_path = os.path.join(STEP3_DIR, "pieman_tr_features_pca50.npy")
    S_path = os.path.join(STEP3_DIR, "pieman_tr_summary.csv")

    X_tr = np.load(X_path)  # (N_TR, L, D)
    tr_sum = pd.read_csv(S_path)

    N_TR, L, D = X_tr.shape
    assert len(tr_sum) == N_TR, "TR summary length mismatch."

    coverage = tr_sum["coverage_sec"].to_numpy(dtype=float)
    has_cov = coverage > 0

    # -----------------------------
    # 2) Determine which TRs are valid targets given lag requirements
    # -----------------------------
    max_lag = max(LAGS)
    target_candidates = np.arange(N_TR, dtype=int)

    # A target TR t is valid if t-max_lag >= 0 and (optionally) has coverage>0
    valid_target = target_candidates >= max_lag
    if REQUIRE_COVERAGE:
        valid_target = valid_target & has_cov

    # Additionally require that ALL lagged source TRs have coverage>0 (optional but recommended)
    # This avoids building features from empty TRs.
    if REQUIRE_COVERAGE:
        for lag in LAGS:
            valid_target = valid_target & has_cov[target_candidates - lag]

    kept_t = target_candidates[valid_target]
    N_eff = int(kept_t.size)

    if N_eff == 0:
        raise RuntimeError("No TRs left after applying lag and coverage filters. Check your inputs/settings.")

    # -----------------------------
    # 3) Build lagged design tensor: (N_eff, n_lags, L, D)
    # -----------------------------
    n_lags = len(LAGS)
    X_lagged_4d = np.zeros((N_eff, n_lags, L, D), dtype=np.float32)

    for i, t in enumerate(kept_t):
        for j, lag in enumerate(LAGS):
            X_lagged_4d[i, j, :, :] = X_tr[t - lag, :, :]

    # Flatten to (N_eff, n_lags*L*D) for standard ridge regression
    X_lagged_flat = X_lagged_4d.reshape(N_eff, n_lags * L * D)

    # -----------------------------
    # 4) Save outputs
    # -----------------------------
    np.save(os.path.join(OUT_DIR, "pieman_Xlagged_4d.npy"), X_lagged_4d)
    np.save(os.path.join(OUT_DIR, "pieman_Xlagged_flat.npy"), X_lagged_flat)

    meta = {
        "tr_sec": TR_SEC,
        "lags_tr": LAGS,
        "n_tr_original": int(N_TR),
        "n_tr_effective": int(N_eff),
        "max_lag": int(max_lag),
        "require_coverage": bool(REQUIRE_COVERAGE),
        "kept_target_tr_indices": kept_t.tolist(),
        "note": "X_lagged[t] uses language features from previous TRs according to lags. "
                "Use the same kept_target_tr_indices to slice fMRI Y later."
    }
    meta_path = os.path.join(OUT_DIR, "pieman_lagged_meta.json")
    with open(meta_path, "w", encoding="utf-8") as f:
        json.dump(meta, f, indent=2)

    # -----------------------------
    # 5) Plot A: coverage and kept TR indices
    # -----------------------------
    plt.figure(figsize=(12, 4))
    plt.plot(np.arange(N_TR), coverage, label="coverage_sec")
    plt.scatter(kept_t, coverage[kept_t], s=10, label="kept target TRs")
    plt.xlabel("TR index")
    plt.ylabel("Coverage (sec)")
    plt.title("TR coverage and kept target TR indices (after lag+coverage filters)")
    plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(OUT_DIR, "fig_coverage_and_kept_tr.png"), dpi=FIG_DPI)
    plt.close()

    # -----------------------------
    # 6) Plot B: lag energy by layer (unified y-range)
    # -----------------------------
    # Compute L2 norm across PCA dims for each (sample, lag, layer)
    norms = np.linalg.norm(X_lagged_4d, axis=3)  # (N_eff, n_lags, L)

    # Average over samples to get (n_lags, L)
    mean_norm = norms.mean(axis=0)

    # Unified y-range based on robust max
    y_max = float(np.quantile(mean_norm, 0.99))

    plt.figure(figsize=(12, 5))
    for j, lag in enumerate(LAGS):
        plt.plot(np.arange(L), mean_norm[j, :], label=f"lag={lag}TR")
    plt.ylim(0.0, y_max)
    plt.xlabel("Layer (0=embeddings, 48=top)")
    plt.ylabel("Mean L2 norm (PCA50)")
    plt.title("Lagged feature energy by layer (mean over kept TRs)")
    plt.legend(ncol=4, fontsize=8)
    plt.tight_layout()
    plt.savefig(os.path.join(OUT_DIR, "fig_lag_energy_by_layer.png"), dpi=FIG_DPI)
    plt.close()

    # -----------------------------
    # Print a compact summary
    # -----------------------------
    print("\n[STEP4 DONE]")
    print(f"Input X_tr: {X_tr.shape} (N_TR, L, D)")
    print(f"LAGS (TR): {LAGS} | max_lag={max_lag}")
    print(f"Kept target TRs: {N_eff}/{N_TR}")
    print(f"Saved X_lagged_4d:   pieman_Xlagged_4d.npy  shape={X_lagged_4d.shape}")
    print(f"Saved X_lagged_flat: pieman_Xlagged_flat.npy shape={X_lagged_flat.shape}")
    print(f"Saved metadata:      {meta_path}")
    print("Saved figures: fig_coverage_and_kept_tr.png, fig_lag_energy_by_layer.png")


if __name__ == "__main__":
    main()


[STEP4 DONE]
Input X_tr: (300, 49, 50) (N_TR, L, D)
LAGS (TR): [1, 2, 3, 4, 5, 6, 7, 8] | max_lag=8
Kept target TRs: 144/300
Saved X_lagged_4d:   pieman_Xlagged_4d.npy  shape=(144, 8, 49, 50)
Saved X_lagged_flat: pieman_Xlagged_flat.npy shape=(144, 19600)
Saved metadata:      E:\Nastase\encoding_features\pieman_step4\pieman_lagged_meta.json
Saved figures: fig_coverage_and_kept_tr.png, fig_lag_energy_by_layer.png


In [2]:
# -*- coding: utf-8 -*-
"""
Step 4b (RELAXED VERSION)
Build lagged encoding design matrix with relaxed coverage filtering.

Key difference from Step4:
- Only require target TR coverage > 0
- Lagged TRs may have zero coverage (kept as zero features)

This preserves temporal continuity and statistical power.
"""

import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt


# -----------------------------
# USER SETTINGS
# -----------------------------
STEP3_DIR = r"E:\Nastase\encoding_features\pieman_step3"
OUT_DIR   = r"E:\Nastase\encoding_features\pieman_step4b"

TR_SEC = 1.5
LAGS = list(range(1, 9))  # 1â€“8 TR
FIG_DPI = 220


def main():

    os.makedirs(OUT_DIR, exist_ok=True)

    # -----------------------------
    # Load inputs
    # -----------------------------
    X_tr = np.load(os.path.join(STEP3_DIR,
                                "pieman_tr_features_pca50.npy"))

    tr_sum = pd.read_csv(os.path.join(
        STEP3_DIR, "pieman_tr_summary.csv"))

    coverage = tr_sum["coverage_sec"].to_numpy()

    N_TR, L, D = X_tr.shape
    max_lag = max(LAGS)

    # -----------------------------
    # RELAXED FILTERING
    # -----------------------------
    target_tr = np.arange(N_TR)

    # must have enough history
    valid = target_tr >= max_lag

    # optional: require target has words
    valid = valid & (coverage > 0)

    kept_t = target_tr[valid]
    N_eff = len(kept_t)

    # -----------------------------
    # Build lagged tensor
    # -----------------------------
    n_lags = len(LAGS)
    X_lagged_4d = np.zeros(
        (N_eff, n_lags, L, D), dtype=np.float32)

    for i, t in enumerate(kept_t):
        for j, lag in enumerate(LAGS):
            X_lagged_4d[i, j] = X_tr[t - lag]

    X_lagged_flat = X_lagged_4d.reshape(
        N_eff, n_lags * L * D)

    # -----------------------------
    # Save outputs
    # -----------------------------
    np.save(os.path.join(
        OUT_DIR, "pieman_Xlagged_4d.npy"), X_lagged_4d)

    np.save(os.path.join(
        OUT_DIR, "pieman_Xlagged_flat.npy"), X_lagged_flat)

    meta = {
        "tr_sec": TR_SEC,
        "lags_tr": LAGS,
        "n_tr_original": int(N_TR),
        "n_tr_effective": int(N_eff),
        "filtering": "relaxed (target coverage only)",
        "kept_target_tr_indices": kept_t.tolist()
    }

    with open(os.path.join(
        OUT_DIR, "pieman_lagged_meta.json"),
        "w", encoding="utf-8") as f:
        json.dump(meta, f, indent=2)

    # -----------------------------
    # FIGURE 1: kept TR overview
    # -----------------------------
    plt.figure(figsize=(12,4))
    plt.plot(coverage, label="coverage_sec")
    plt.scatter(kept_t, coverage[kept_t],
                s=10, label="kept TR")
    plt.xlabel("TR index")
    plt.ylabel("Coverage (sec)")
    plt.title("Relaxed filtering: kept TRs")
    plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(
        OUT_DIR,
        "fig_kept_tr_relaxed.png"), dpi=FIG_DPI)
    plt.close()

    # -----------------------------
    # FIGURE 2: lag energy check
    # -----------------------------
    norms = np.linalg.norm(X_lagged_4d, axis=3)
    mean_norm = norms.mean(axis=0)

    ymax = np.quantile(mean_norm, 0.99)

    plt.figure(figsize=(12,5))
    for j, lag in enumerate(LAGS):
        plt.plot(mean_norm[j], label=f"lag={lag}")
    plt.ylim(0, ymax)
    plt.xlabel("Layer")
    plt.ylabel("Mean L2 norm")
    plt.title("Lag energy by layer (relaxed)")
    plt.legend(ncol=4, fontsize=8)
    plt.tight_layout()
    plt.savefig(os.path.join(
        OUT_DIR,
        "fig_lag_energy_relaxed.png"), dpi=FIG_DPI)
    plt.close()

    # -----------------------------
    # Summary
    # -----------------------------
    print("\n[STEP4b DONE]")
    print(f"Input shape: {X_tr.shape}")
    print(f"Kept TRs: {N_eff}/{N_TR}")
    print(f"Output flat shape: {X_lagged_flat.shape}")


if __name__ == "__main__":
    main()


[STEP4b DONE]
Input shape: (300, 49, 50)
Kept TRs: 259/300
Output flat shape: (259, 19600)
