In [None]:
import pandas as pd
import numpy as np
import itertools
import json

In [None]:
def read_tracking(traintest, lab_id, video_id, pix_per_cm, drop_body_parts=[]):
    """
    Output: DataFrame pvid
    Shape: (num_frames, (num_mice * num_bodyparts *2))
    Mỗi hàm = 1 frame, mỗi cột = toạ độ x/y của 1 bodypart của 1 chuột
    """
    video_path = f"{traintest}_tracking/{lab_id}/{video_id}.parquet"
    df = pd.read_parquet(video_path)

    if len(np.unique(df.bodypart)) > 5:
        df = df[~df.bodypart.isin(drop_body_parts)]

    # Pivot: frame_idx = idx, columns = mouse_id và (x, y) của bodyparts
    pvid = df.pivot(index="video_frame", columns=["mouse_id", "bodypart"], values=["x", "y"])
    pvid = pvid.reorder_levels([1, 2, 0], axis=1).T.sort_index().T

    # Chuấn hoá pixel -> cm
    pvid /= pix_per_cm

    mouse_ids = pvid.columns.get_level_values("mouse_id").unique()
    pvid.attrs["mouse_ids"] = mouse_ids
    pvid.attrs["num_mice"] = len(mouse_ids)
    return pvid

In [None]:
def create_single_mouse_dfs(pvid, annot=None):
    """
    Trả về: list các DataFrame, mỗi DF ứng với 1 chuột
    """
    single_dfs = []
    mouse_ids = pvid.attrs["mouse_ids"]

    # Annotation self-oriented
    if annot is not None:
        annot_self = annot[annot.agent_id == annot.target_id]
    else:
        annot_self = None

    for mouse_id in mouse_ids:
        # Features tracking của chuột
        mouse_features = pvid[mouse_id].copy()

        # Meta info
        meta = pd.DataFrame({
            "video_frame": pvid.index,
            "agent_id": mouse_id,
            "target_id": "self"
        }, index=pvid.index)

        # Build labels
        if annot_self is not None:
            subset = annot_self[annot_self.agent_id == mouse_id]

            actions = subset["action"].unique()
            labels = pd.DataFrame(0, index=pvid.index, columns=actions)

            for _, row in subset.iterrows():
                labels.loc[row.start_frame:row.stop_frame, row.action] = 1
        else:
            labels = None

        # Gom vào 1 dataframe
        if labels is not None:
            df_mouse = pd.concat([mouse_features, meta, labels], axis=1)
        else:
            df_mouse = pd.concat([mouse_features, meta], axis=1)

        # Thêm vào list
        single_dfs.append(df_mouse)

    return single_dfs

def create_mouse_pair_dfs(pvid, annot=None):
    """
    Trả về: list các DataFrame, mỗi DF ứng với 1 cặp agent-target
    """
    pair_dfs = []

    # Annotation social-oriented
    if annot is not None:
        annot_pair = annot[annot.agent_id != annot.target_id]
    else:
        annot_pair = None

    for agent, target in itertools.permutations(np.unique(pvid.columns.get_level_values("mouse_id")), 2):
        # Features tracking của agent-target
        agent_features = pvid[agent].copy()
        target_features = pvid[target].copy()
        pair_features = pd.concat([agent_features, target_features], axis=1, keys=["A", "B"])

        # Meta info
        meta = pd.DataFrame({
            "video_frame": pvid.index,
            "agent_id": agent,
            "target_id": target
        }, index=pvid.index)

        # Build labels
        if annot_pair is not None:
            subset = annot_pair[(annot_pair.agent_id==agent) & (annot_pair.target_id==target)]

            actions = subset["action"].unique()
            labels = pd.DataFrame(0, index=pvid.index, columns=actions)

            for _, row in subset.iterrows():
                labels.loc[row.start_frame:row.stop_frame, row.action] = 1
        else:
            labels = None

        # Gom vào 1 dataframe
        if labels is not None:
            df_mouse = pd.concat([pair_features, meta, labels], axis=1)
        else:
            df_mouse = pd.concat([pair_features, meta], axis=1)

        # Thêm vào list
        pair_dfs.append(df_mouse)

    return pair_dfs

In [None]:
def calculate_centers(df):
    cols = df.columns
    if cols.nlevels == 2:
        if ("body_center", "x") not in df.columns or ("body_center", "y") not in df.columns:
            df[("body_center", "x")] = (df[("nose", "x")] + df[("tail_base", "x")]) / 2
            df[("body_center", "y")] = (df[("nose", "y")] + df[("tail_base", "y")]) / 2

    elif cols.nlevels == 3:
        mice = sorted(list(set(c[0] for c in cols)))

        for m in mice:
            has_body_center = ((m, "body_center", "x") in cols) and ((m, "body_center", "y") in cols)
            if not has_body_center:
                df[(m, "body_center", "x")] = (df[(m, "nose", "x")] + df[(m, "tail_base", "x")]) / 2
                df[(m, "body_center", "y")] = (df[(m, "nose", "y")] + df[(m, "tail_base", "y")]) / 2
    return df

def calculate_speed_lag(df, part, fps, lag=10, mouse=None):
    """
    Tính speed theo khoảng lag frame
    """
    cols = df.columns
    if cols.nlevels == 2:
        x = df[(part,"x")]
        y = df[(part,"y")]
    else:
        x = df[(mouse, part, "x")]
        y = df[(mouse, part, "y")]

    dx = x.diff(lag)
    dy = y.diff(lag)
    speed = np.sqrt(dx**2 + dy**2) * fps
    return speed.fillna(0)

# Tính các thống kê của 1 đại lượng theo nhiều cửa sổ thời gian
def calculate_window_stats(df, metric, name, fps, scales=[20, 40, 60, 80]):
    """
    Thêm rolling statistics cho bất kỳ series nào.
    
    metric : pd.Series (ví dụ speed, distance, curvature...)
    fps    : frames_per_second
    scales : list window sizes quy đổi theo 30fps → mặc định [20,40,60,80]
    """
    for scale in scales:
        ws = max(1, int(round(scale * float(fps) / 30)))
        roll = metric.rolling(ws, min_periods=max(1, ws//4))

        df[f"{name}_mean_{scale}"] = roll.mean()
        df[f"{name}_std_{scale}"]  = roll.std()
        df[f"{name}_min_{scale}"]  = roll.min()
        df[f"{name}_max_{scale}"]  = roll.max()

    return df

In [None]:
def build_single_features(single_mouse_df, meta_fps):
    df = single_mouse_df.copy()
    bodyparts = df.columns.get_level_values(0).unique()

    df = calculate_centers(df)
    
    # === Shape and Position Features ===

    # Euclidean distance giữa bodyparts
    distances = pd.DataFrame({
        f"{p1}+{p2}": np.sqrt((df[(p1, "x")] - df[(p2, "x")])**2 + 
                              (df[(p1, "y")] - df[(p2, "y")])**2) 
        for p1, p2 in itertools.combinations(bodyparts, 2)
    })
    df = pd.concat([df, distances], axis=1)

    # Elongation: Tỷ lệ dài/rộng cơ thể
    if "nose+tail_base" in df.columns and "ear_left+ear_right" in df.columns:
        df["elong"] = df["nose+tail_base"] / df["ear_left+ear_right"]

    # Hướng/Góc cơ thể
    v1_x = df[("nose","x")] - df[("body_center","x")]
    v1_y = df[("nose","y")] - df[("body_center","y")]
    v2_x = df[("tail_base","x")] - df[("body_center","x")]
    v2_y = df[("tail_base","y")] - df[("body_center","y")]
    df["body_angle"] = (v1_x*v2_x + v1_y*v2_y) / (np.sqrt(v1_x**2+v1_y**2) * np.sqrt(v2_x**2+v2_y**2) + 1e-6)


    # === Movement ===

    # Speed và Curvature: Quỹ đạo
    df["speed"] = np.sqrt(df[("body_center", "x")].diff()**2 
                          + df[("body_center", "y")].diff()**2)
    df["accelerate"] = df["speed"].diff()

    # Speed các bodyparts giữa mốc cách nhau {lag} frame
    for p in ["body_center", "ear_left", "ear_right"]: # cần chọn feature có lý hơn
        df[f"speed_{p}_lag_10"] = calculate_speed_lag(df, p, fps=meta_fps)


    # Rolling stats của speed của các bodyparts theo các window size
    for part in ["body_center", "head", "nose", "tail_base"]:
        speed = np.sqrt(df[(part, "x")].diff()**2 + df[(part, "y")].diff()**2) * float(fps)
        df = calculate_window_stats(df, speed, "speed", fps=meta_fps)

    # Curvature các bodyparts giữa mốc cách nhau {lag} frame
    for p in ["body_center", "ear_left", "ear_right"]: # cần chọn feature có lý hơn
        lag = max(1, int(round(10 * float(meta_fps) / 30))) # 10 frames
        shifted_x = df[(p,"x")].shift(lag)
        shifted_y = df[(p,"y")].shift(lag)
        df[f"curvature_{p}_lag_{lag}"] = np.sqrt((shifted_x - df[(p,"x")])**2 + (shifted_y - df[(p,"y")])**2)

    # Curvature: Quỹ đạo - Average và std curvature của các bodyparts theo các window size
    # Có thể phức tạp quá, mới bắt đầu làm thì skip

    df = df.fillna(0)

    return df

def build_pair_features(mouse_pair_df, meta_fps):
    df = mouse_pair_df.copy()
    bodyparts = df.columns.get_level_values(1).unique()

    df = calculate_centers(df)

    # === Pairwise distance giữa các bodyparts của 2 chuột ===
    distances = pd.DataFrame({
        f"A_{p1}+B_{p2}": np.sqrt((df[("A", p1, "x")] - df[("B", p2, "x")])**2 + 
                              (df[("A", p1, "y")] - df[("B", p2, "y")])**2)
        for p1, p2 in itertools.product(bodyparts, repeat=2)
        })
    df = pd.concat([df, distances], axis=1)


    # === Relative Orientation (cosine similarity) ===
    vec_A_x = df[("A", "nose", "x")] - df[("A", "tail_base", "x")]
    vec_A_y = df[("A", "nose", "y")] - df[("A", "tail_base", "y")]
    vec_B_x = df[("B", "nose", "x")] - df[("B", "tail_base", "x")]
    vec_B_y = df[("B", "nose", "y")] - df[("B", "tail_base", "y")]
    df["relative_orientation"] = (vec_A_x * vec_B_x + vec_A_y * vec_B_y)/(
        np.sqrt(vec_A_x**2 + vec_A_y**2) * np.sqrt(vec_B_x**2 + vec_B_y**2) + 1e-6
    )
    

    # === Approach Vector ===
    dist_center = np.sqrt(
        (df[("A", "body_center", "x")] - df[("B", "body_center", "x")])**2 
        + (df[("A", "body_center", "y")] - df[("B", "body_center", "y")])**2
    )
    approach_A = dist_center.diff()
    approach_B = dist_center.diff()
    df["approach_A"] = approach_A.fillna(0)
    df["approach_B"] = approach_B.fillna(0)


    # === Relative (Squared) Distance theo các window size ===
    center_distance = (df[("A", "body_center", "x")] - df[("B", "body_center", "x")])**2 + \
                     (df[("A", "body_center", "y")] - df[("B", "body_center", "y")])**2
    df = calculate_window_stats(df, center_distance, "center_distance", fps=meta_fps)


    # === Relative Speed ===
    speed_A = calculate_speed_lag(df["A"], "body_center", meta_fps, mouse="A")
    speed_B = calculate_speed_lag(df["B"], "body_center", meta_fps, mouse="B")

    # relative speed
    rel_speed = (speed_A - speed_B).abs()

    df[f"speed_A_lag_10"] = speed_A
    df[f"speed_B_lag_10"] = speed_B
    df[f"relative_speed_A_B_lag_10"] = rel_speed

    return df

In [None]:
drop_body_parts =  [
    'headpiece_bottombackleft', 'headpiece_bottombackright', 'headpiece_bottomfrontleft', 'headpiece_bottomfrontright', 
    'headpiece_topbackleft', 'headpiece_topbackright', 'headpiece_topfrontleft', 'headpiece_topfrontright', 
    'spine_1', 'spine_2', 'tail_middle_1', 'tail_middle_2', 'tail_midpoint'
]

def generate_mouse_data(dataset, traintest, drop_body_parts=[]):
    """
    Tạo DataFrames cho mỗi chuột và mỗi cặp chuột trong 1 video
    Trả về danh sách DataFrames
    """
    all_single = []
    all_pair = []

    for _, row in dataset.iterrows():
        # Meta info
        lab_id = row.lab_id
        video_id = row.video_id
        pix_per_cm = row.pix_per_cm_approx
        meta_fps = row.frames_per_second
        
        # Đọc tracking
        pvid = read_tracking(traintest, lab_id, video_id, pix_per_cm, drop_body_parts=drop_body_parts)
        
        # Đọc annotation
        if traintest == "train":
            annot_path = f"{traintest}_tracking/{row.lab_id}/{video_id}.parquet"
            annot = pd.read_parquet(annot_path)
        else:
            annot = None
            
        # Tạo DataFrame cho từng chuột
        sdfs = create_single_mouse_dfs(pvid, annot)
        for df_single in sdfs:
            df_features = build_single_features(df_single, meta_fps)
            all_single.append(df_features)

        # Nếu có nhiều hơn 1 chuột, tạo DataFrame cho từng cặp chuột
        if pvid.attrs["num_mice"] > 1:
            pdfs = create_mouse_pair_dfs(pvid, annot)
            for df_pair in pdfs:
                df_features = build_pair_features(df_pair, meta_fps)
                all_pair.append(df_features)

    return all_single, all_pair