In [1]:
import numpy as np
import pandas as pd
import os
from tqdm import tqdm
from scipy.signal import find_peaks, detrend
from scipy.stats import entropy
from sklearn.linear_model import LinearRegression

In [None]:
root = r"C:\Users\lixin\Desktop\E1"
frame_root = os.path.join(root, "AI_features", "frame_data")
save_path  = os.path.join(root, "AI_features", "temporal_features.csv")

In [3]:
def safe_load(path):
    if not os.path.exists(path):
        return None
    return np.load(path)

In [4]:
def compute_trend_r2(mags):
    t = np.arange(len(mags)).reshape(-1, 1)
    reg = LinearRegression().fit(t, mags)
    pred = reg.predict(t)

    ss_res = np.sum((mags - pred) ** 2)
    ss_tot = np.sum((mags - np.mean(mags)) ** 2)
    return 1 - ss_res / ss_tot if ss_tot > 0 else 0

In [5]:
def compute_rhythm_strength(mags):
    mags_d = detrend(mags)
    fft_vals = np.abs(np.fft.rfft(mags_d))
    fft_vals[0] = 0
    if np.sum(fft_vals) == 0:
        return 0
    return np.max(fft_vals) / np.sum(fft_vals)

In [6]:
def compute_cycle_score(mags):
    peaks, _ = find_peaks(mags, distance=5)
    if len(peaks) < 2:
        return 0
    intervals = np.diff(peaks)
    return np.std(intervals)

In [7]:
def compute_pause_ratio(mags, ratio=0.15):
    thr = np.mean(mags) * ratio
    pauses = np.sum(mags < thr)
    return pauses / len(mags)

In [8]:
def compute_smoothness(mags):
    if len(mags) < 3:
        return 0
    accel = np.diff(mags)
    return 1 / (np.std(accel) + 1e-6)

In [9]:
def compute_complexity(mags):
    hist, _ = np.histogram(mags, bins=20)
    if hist.sum() == 0:
        return 0
    return entropy(hist / hist.sum())

In [10]:
def compute_direction_consistency(dir_mean, dir_std):
    return float(1 / (np.mean(dir_std) + 1e-6))

In [11]:
def compute_segment_count(diffs, thresh_ratio=0.2):
    thr = np.mean(diffs) * (1 + thresh_ratio)
    high_change = diffs > thr
    return int(np.sum(np.diff(high_change.astype(int)) == 1))

In [12]:
def compute_dir_hist_entropy(hist):
    if np.sum(hist) == 0:
        return 0
    return entropy(hist / np.sum(hist))

In [13]:
rows = []

for fname in tqdm(sorted(os.listdir(frame_root))):
    if not fname.endswith("_mags.npy"):
        continue

    video_id = fname.replace("_mags.npy", "")

    mags = safe_load(os.path.join(frame_root, f"{video_id}_mags.npy"))
    dir_mean = safe_load(os.path.join(frame_root, f"{video_id}_dir_mean.npy"))
    dir_std = safe_load(os.path.join(frame_root, f"{video_id}_dir_std.npy"))
    dir_hist = safe_load(os.path.join(frame_root, f"{video_id}_dir_hist.npy"))
    diffs = safe_load(os.path.join(frame_root, f"{video_id}_diffs.npy"))

    if mags is None or dir_mean is None or dir_std is None or diffs is None or dir_hist is None:
        continue

    row = {
        "video_id": video_id,

        "trend_r2": compute_trend_r2(mags),
        "smoothness": compute_smoothness(mags),

        "rhythm_strength": compute_rhythm_strength(mags),
        "cycle_score": compute_cycle_score(mags),

        "pause_ratio": compute_pause_ratio(mags),
        "segment_count": compute_segment_count(diffs),

        "speed_entropy": compute_complexity(mags),

        "direction_consistency": compute_direction_consistency(dir_mean, dir_std),
        "dir_hist_entropy": compute_dir_hist_entropy(dir_hist),

        "mean_speed": float(np.mean(mags)),
        "speed_std": float(np.std(mags)),
    }

    rows.append(row)

df = pd.DataFrame(rows)
df.to_csv(save_path, index=False)

100%|██████████| 580/580 [00:04<00:00, 119.58it/s]
