In [None]:
import sys

analysis_lib = "/projects/kumar-lab/miaod/projects/uvFI/pipelines/feature-extraction/src"
sys.path.append(analysis_lib)

import features, preprocess, utils

In [None]:
from pathlib import Path

expr_dir = Path("/projects/kumar-lab/miaod/projects/uvFI/experiments/2025-06-13_kpms-feature-extraction/")
outputs_dir = expr_dir / "outputs"

project_dir = expr_dir / "data"
model_name = "2025-06-13_kpms-inference_data"
pose_dir = expr_dir / "data/2025-06-13_kpms-inference_data/poses_csv"

utils.set_project_info(str(project_dir), model_name, str(pose_dir))

In [None]:
preprocess.create_groups_csv()

In [None]:
preprocess.combine_inference_results(r"^out_\d+\.h5$")

In [None]:
import keypoint_moseq as kpms
import math
import numpy as np
import pandas as pd
from scipy.stats import entropy
from tqdm import tqdm 
from typing import Any, Dict, Sequence, Tuple, Union

from config import FIGURE_DIR
from utils import _read_project_info

In [None]:
results_t = Dict[str, Dict[str, np.ndarray]]
moseq_df_t = pd.DataFrame
stats_df_t = pd.DataFrame
analyses_t = Dict[str, Union[results_t, moseq_df_t, stats_df_t]]

def load_analyses(
    min_frequency: float=0.005, 
    fps: int=30
) -> Tuple[results_t, moseq_df_t, stats_df_t]:
    project_dir, model_name = _read_project_info("project_dir", "model_name")
    results = kpms.load_results(project_dir, model_name)

    moseq_df = kpms.compute_moseq_df(project_dir, model_name, smooth_heading=True)
    stats_df = kpms.compute_stats_df(project_dir, model_name, moseq_df,
                                     min_frequency=min_frequency, groupby=["name"], fps=fps)
    return {
        "results": results,
        "moseq_df": moseq_df,
        "stats_df": stats_df,
    }

In [None]:
analyses = load_analyses()

In [None]:
def get_syllable_lempel_ziv(analyses: analyses_t):
    def lempel_ziv(seq: Sequence) -> int:
        n, i, phrases = len(seq), 0, 0
        dictionary = set()
        while i < n:
            k = 1
            while i + k <= n and tuple(seq[i:i+k]) in dictionary:
                k += 1
            dictionary.add(tuple(seq[i:i+k]))
            phrases += 1
            i += k
        return phrases

    def normalized_lempel_ziv(seq: Sequence) -> float:
        n = len(seq)
        if n == 0:
            return 0.0
        sigma = len(set(seq))
        if sigma <= 1 or n <= 1:
            return 0.0
        lz = lempel_ziv(seq)
        norm = lz * math.log(sigma) / (n * math.log(n))
        return norm

    results = analyses["results"]
    ret = [
        normalized_lempel_ziv(info["syllable"])
        for name, info in tqdm(results.items(), desc="computing get_syllable_lempel_ziv")
    ]
    return {"syllable_lempel_ziv": ret}

In [None]:
syllable_lz_output = get_syllable_lempel_ziv(analyses)

In [None]:
def get_latent_embedding_statistics(analyses: analyses_t):
    results = analyses["results"]

    stats = []
    for name, info in tqdm(results.items(), desc="computing get_latent_embedding_statistics"):
        latent_embeddings = info["latent_state"]

        means   = latent_embeddings.mean(axis=0)
        medians = np.median(latent_embeddings, axis=0)
        stds    = latent_embeddings.std(axis=0, ddof=0)
        
        features = np.concatenate((means, medians, stds))
        stats.append(features)
    
    trans = list(map(list, zip(*stats)))
    feature_len = len(trans)
    assert feature_len % 3 == 0

    ret = {}
    for i in range(feature_len // 3):
        label = ("mean" if i < feature_len // 3 else
                 "median" if i < 2 * feature_len // 3 else "std")
        ret[f"latent_embedding_{label}_{i%3}"] = trans[i]
    return ret

In [None]:
latent_embedding_statistics = get_latent_embedding_statistics(analyses)

In [None]:
def get_syllable_frequencies(analyses: analyses_t):
    stats_df = analyses["stats_df"]
    
    freq_wide = stats_df.pivot(index="name", columns="syllable", values="frequency").fillna(0)
    freq_array = freq_wide.to_numpy()
    return {f"syllable_frequency_{i}": list(freq_array[:, i]) for i in range(freq_array.shape[1])}

In [None]:
syllable_frequencies = get_syllable_frequencies(analyses)

In [None]:
def get_transition_mats(analyses: analyses_t, *, min_frequency: float=0.005, normalize="bigram", enable_visualization=False):
    project_dir, model_name = _read_project_info("project_dir", "model_name")

    trans_mats, usages, groups, syll_include = kpms.generate_transition_matrices(
        project_dir, model_name, normalize=normalize, min_frequency=min_frequency
    )

    if enable_visualization:
        kpms.visualize_transition_bigram(
            project_dir,
            model_name,
            groups,
            trans_mats,
            syll_include,
            normalize=normalize,
            show_syllable_names=False,
            save_dir=FIGURE_DIR
        )

    trans_mats = np.stack(trans_mats)
    n = len(trans_mats[0])
    return {f"transition_mat_{i}_{j}": list(trans_mats[:, i, j]) for i in range(n) for j in range(n)}

In [None]:
%%capture

transition_mats = get_transition_mats(analyses, normalize="row")

In [None]:
def get_syllable_shannon_entropy(analyses: analyses_t):
    stats_df = analyses["stats_df"]
    
    freq_wide = stats_df.pivot(index="name", columns="syllable", values="frequency").fillna(0)
    freq_array = freq_wide.to_numpy()
    return {f"syllable_shannon_entropy": [entropy(freq_array[i, :]) for i in tqdm(range(len(freq_array)), desc="computing get_syllable_shannon_entropy")]}

In [None]:
syllable_shannon_entropy = get_syllable_shannon_entropy(analyses)

In [None]:
def get_num_distinct_syllables(analyses: analyses_t, *, ths: Sequence[float]):
    results = analyses["results"]
    n = len(results)
    freqs = [np.bincount(info["syllable"]) / len(info["syllable"]) for name, info in results.items()]
    return {
        f"num_distinct_syllables_th_{th}": [int((freqs[i] > th).sum()) for i in range(n)]
        for th in ths
    }

In [None]:
num_distinct_syllables = get_num_distinct_syllables(analyses, ths=[0.005, 0.02, 0.05])

In [None]:
from functools import reduce
import operator

def merge_features(analyses: analyses_t, features: Sequence[Dict[str, Sequence[Any]]]):
    merged_features = reduce(operator.or_, [{"name": list(analyses["results"].keys())}] + features, {})
    return pd.DataFrame(merged_features)

In [None]:
features_df = merge_features(analyses, [syllable_lz_output, latent_embedding_statistics, syllable_frequencies, transition_mats, syllable_shannon_entropy, num_distinct_syllables])

In [None]:
features_df.columns