In [1]:
import pandas as pd
import numpy as np
import ast
import time
from pathlib import Path
from scipy.fftpack import fft, fftfreq
from scipy.signal import spectrogram
import cv2, base64, io, warnings
from PIL import Image
from ultralytics import YOLO

In [2]:
# Task 1 – Build `trans_audio_features` from stg_audio_data.csv
# ------------------------------------------------------------------
def compute_fft(samples, sampling_rate=8000):
    """Return frequencies and magnitudes for the given audio sample list."""
    samples = np.asarray(samples, dtype=float)
    n = len(samples)
    freqs = np.fft.rfftfreq(n, d=1 / sampling_rate)
    fft_values = np.abs(np.fft.rfft(samples))
    return { "freqs": freqs, "magnitudes": fft_values }

def classify_voice_or_noise(freqs, magnitudes,
                            voice_freq_range=(500, 3500),
                            energy_threshold=1.5e7):
    """Very simple energy‑based binary classification."""
    # total energy in voice band
    band = (freqs >= voice_freq_range[0]) & (freqs <= voice_freq_range[1])
    energy = magnitudes[band].sum()
    return "Voice" if energy > energy_threshold else "Noise"

def detect_cat_voice(classification, freqs, magnitudes,
                     freq_range=(400, 700), harmonic_range=(200, 1000),
                     harmonic_threshold=15):
    """Heuristic cat meow detector based on dominant frequency and harmonic energy."""
    if classification != "Voice":
        return False
    # strongest peak
    dom_freq = freqs[np.argmax(magnitudes)]
    if not (freq_range[0] <= dom_freq <= freq_range[1]):
        return False
    # harmonic energy
    harm_band = (freqs >= harmonic_range[0]) & (freqs <= harmonic_range[1])
    if magnitudes[harm_band].mean() < harmonic_threshold:
        return False
    return True

def detect_human_voice(classification, freqs, magnitudes,
                       freq_range=(150, 600), harmonic_range=(2000, 4000),
                       harmonic_threshold=15):
    """Very coarse human‑speech detector (adult male/female fundamental)."""
    if classification != "Voice":
        return False
    dom_freq = freqs[np.argmax(magnitudes)]
    return freq_range[0] <= dom_freq <= freq_range[1]

def calculate_meow_loudness(is_cat_voice, magnitudes):
    return magnitudes.max() if is_cat_voice else 0.0

def calculate_dominant_frequency(freqs, magnitudes):
    return freqs[np.argmax(magnitudes)]

def build_trans_audio_features(csv_path: str = "stg_audio_data.csv"):
    stg_audio_data = pd.read_csv(csv_path)
    # audio_samples stored as stringified Python lists -> convert
    stg_audio_data["audio_samples"] = stg_audio_data["audio_samples"].apply(ast.literal_eval)

    results = []
    for _, row in stg_audio_data.iterrows():
        fft_res       = compute_fft(row["audio_samples"])
        classification = classify_voice_or_noise(fft_res["freqs"], fft_res["magnitudes"])
        is_cat        = detect_cat_voice(classification, fft_res["freqs"], fft_res["magnitudes"])
        is_human      = detect_human_voice(classification, fft_res["freqs"], fft_res["magnitudes"])
        meow_loudness = calculate_meow_loudness(is_cat, fft_res["magnitudes"])
        dom_freq      = calculate_dominant_frequency(fft_res["freqs"], fft_res["magnitudes"])
        results.append({
            "frame_id"        : row["frame_id"],
            "timestamp"       : row["timestamp"],
            "classification"  : classification,
            "is_cat_voice"    : is_cat,
            "is_human_voice"  : is_human,
            "meow_loudness"   : meow_loudness,
            "dominant_frequency": dom_freq,
        })
    return pd.DataFrame(results)

trans_audio_features = build_trans_audio_features()

In [3]:
# Task 2 – Build `trans_imu_features` from stg_imu_data.csv
# ------------------------------------------------------------------
def unwrap_yaw(yaw_list):
    arr = np.asarray(yaw_list, dtype=float)
    return np.degrees(np.unwrap(np.radians(arr)))

def avg_intra_yaw_diff(yaw_list):
    unwrapped = unwrap_yaw(yaw_list)
    return np.mean(np.diff(unwrapped)) if len(unwrapped) > 1 else 0.0

def compute_rotation_speed(current_yaw_list, prev_avg_yaw):
    """Return rotation speed (deg/frame), current average yaw and delta yaw."""
    current_avg_yaw = avg_intra_yaw_diff(current_yaw_list)
    if prev_avg_yaw is None:
        return 0.0, current_avg_yaw, 0.0
    delta_yaw = current_avg_yaw - prev_avg_yaw
    rotation_speed = abs(delta_yaw)
    return rotation_speed, current_avg_yaw, delta_yaw

def compute_movement_intensity(delta_yaw, delta_pitch, delta_roll):
    return np.sqrt(delta_yaw**2 + delta_pitch**2 + delta_roll**2)

def compute_balance_state(pitch, roll, movement_intensity,
                          pitch_thr=15, roll_thr=15, move_thr=10):
    """Simple heuristic for balance state."""
    if movement_intensity > move_thr:
        return "moving"
    if abs(pitch) < pitch_thr and abs(roll) < roll_thr:
        return "balanced"
    return "unbalanced"

def compute_cat_interaction(movement_intensity, move_thr=20):
    return movement_intensity > move_thr

def process_imu_live(imu_df):
    state = { "prev_avg_yaw": None, "prev_pitch": None, "prev_roll": None }
    rows = []
    for _, row in imu_df.iterrows():
        rotation_speed, current_avg_yaw, delta_yaw = compute_rotation_speed(row["yaw"], state["prev_avg_yaw"])
        pitch = row["pitch"]
        roll  = row["roll"]
        delta_pitch = 0.0 if state["prev_pitch"] is None else pitch - state["prev_pitch"]
        delta_roll  = 0.0 if state["prev_roll"]  is None else roll  - state["prev_roll"]

        movement_intensity = compute_movement_intensity(delta_yaw, delta_pitch, delta_roll)
        balance_state      = compute_balance_state(pitch, roll, movement_intensity)
        cat_interaction    = compute_cat_interaction(movement_intensity)

        rows.append({
            "frame_id"           : row["frame_id"],
            "timestamp"          : row["timestamp"],
            "rotation_speed"     : rotation_speed,
            "delta_yaw"          : delta_yaw,
            "delta_pitch"        : delta_pitch,
            "delta_roll"         : delta_roll,
            "movement_intensity" : movement_intensity,
            "balance_state"      : balance_state,
            "cat_interaction_detected": cat_interaction
        })

        state["prev_avg_yaw"] = current_avg_yaw
        state["prev_pitch"]   = pitch
        state["prev_roll"]    = roll

    return pd.DataFrame(rows)

def build_trans_imu_features(csv_path: str = "stg_imu_data.csv"):
    stg_imu_data = pd.read_csv(csv_path)
    # Convert string lists to Python lists
    for col in ["yaw", "pitch", "roll"]:
        stg_imu_data[col] = stg_imu_data[col].apply(ast.literal_eval)
    # pitch & roll scalar (take first sample)
    stg_imu_data["pitch"] = stg_imu_data["pitch"].apply(lambda x: x[0] if isinstance(x, list) else x)
    stg_imu_data["roll"]  = stg_imu_data["roll"].apply(lambda x: x[0] if isinstance(x, list) else x)
    # yaw remain list
    return process_imu_live(stg_imu_data)

trans_imu_features = build_trans_imu_features()

In [4]:
# Task 3 – Build `trans_visual_cat_detection` from stg_visual_data.csv
# ------------------------------------------------------------------
IMG_SIZE = 640
CONF_THR = 0.05
DEVICE   = "cpu"
CAT_ID   = 15

# initialise YOLO model (cat‑only)
_yolo_model = YOLO("yolov8n.pt").to(DEVICE)
_yolo_model.fuse()
_yolo_model.overrides["conf"]    = CONF_THR
_yolo_model.overrides["classes"] = [CAT_ID]

import base64, io

def jpeg_b64_to_rgb_ndarray(b64: str, img_size: int = IMG_SIZE):
    with Image.open(io.BytesIO(base64.b64decode(b64))) as im:
        im = im.convert("RGB").resize((img_size, img_size), Image.LANCZOS)
        return np.asarray(im, dtype=np.uint8)

def detect_cat(df: pd.DataFrame, img_size: int = IMG_SIZE) -> pd.DataFrame:
    rows = []
    for _, r in df.iterrows():
        rgb = jpeg_b64_to_rgb_ndarray(r["frame_data"], img_size)
        res = _yolo_model(rgb, imgsz=img_size, verbose=False)[0]
        boxes = res.boxes.cpu()
        det_pd = pd.DataFrame({
            "xmin"      : boxes.xyxy[:, 0].numpy(),
            "ymin"      : boxes.xyxy[:, 1].numpy(),
            "xmax"      : boxes.xyxy[:, 2].numpy(),
            "ymax"      : boxes.xyxy[:, 3].numpy(),
            "confidence": boxes.conf.numpy(),
            "class"     : boxes.cls.numpy().astype(int),
            "name"      : ["cat"]*len(boxes),
        })
        cats = det_pd
        rows.append({
            "frame_id"        : int(r["frame_id"]),
            "timestamp"       : r["timestamp"],
            "is_cat_detected" : int(len(cats) > 0),
            "cat_confidence"  : float(cats["confidence"].max()) if len(cats) else 0.0,
            "raw_detection"   : det_pd.to_dict("records"),
        })
    return pd.DataFrame(rows)

def build_trans_visual_cat_detection(csv_path: str = "stg_visual_data.csv"):
    stg_visual_data = pd.read_csv(csv_path, converters={"frame_data": str})
    return detect_cat(stg_visual_data, IMG_SIZE)

trans_visual_cat_detection = build_trans_visual_cat_detection()

YOLOv8n summary (fused): 72 layers, 3,151,904 parameters, 0 gradients, 8.7 GFLOPs


In [5]:
# Task 4 – Build `mrt_experiences` from the transformed tables
# ------------------------------------------------------------------
from collections import deque

def _centroid(det_entry):
    """Return centroid x,y for detection dict (expects xmin,xmax,ymin,ymax)."""
    return (det_entry["xmin"] + det_entry["xmax"]) / 2, (det_entry["ymin"] + det_entry["ymax"]) / 2

def build_mrt_experiences(aud_df: pd.DataFrame,
                          imu_df: pd.DataFrame,
                          vis_df: pd.DataFrame,
                          n_frames: int = 12):
    rows = []
    for fid in sorted(vis_df["frame_id"].unique()):
        aud_window = aud_df[aud_df["frame_id"] <= fid].tail(n_frames)
        imu_window = imu_df[imu_df["frame_id"] <= fid].tail(n_frames)
        vis_window = vis_df[vis_df["frame_id"] <= fid].tail(n_frames)

        if len(vis_window) < n_frames:
            rows.append({
                "experience_id": fid,
                "timestamp": np.nan,
                "is_cat_voice": np.nan,
                "is_human_voice": np.nan,
                "human_voice_sequence": np.nan,
                "cat_voice_sequence": np.nan,
                "meow_loudness": np.nan,
                "cat_detected": np.nan,
                "cat_position_x": np.nan,
                "cat_position_y": np.nan,
                "cat_movement_direction": np.nan,
                "cat_activity_level": np.nan,
                "cat_distance_change": np.nan,
                "movement_intensity": np.nan,
                "cat_interaction_detected": np.nan,
            })
            continue

        # Audio aggregates
        aud_is_cat   = aud_window["is_cat_voice"].fillna(False)
        aud_is_human = aud_window["is_human_voice"].fillna(False)
        human_seq    = aud_window.loc[aud_is_human, "frame_id"].tolist()
        cat_seq      = aud_window.loc[aud_is_cat,   "frame_id"].tolist()
        meow_loudness= aud_window["meow_loudness"].mode().iloc[0] if not aud_window["meow_loudness"].empty else np.nan

        # IMU aggregates
        movement_intensity = imu_window["movement_intensity"].mean()
        cat_interaction    = imu_window["cat_interaction_detected"].any()

        # Vision aggregates – use last row
        vis_last = vis_window.iloc[-1]
        cat_detected = bool(vis_last["is_cat_detected"])
        cat_x = cat_y = np.nan
        if cat_detected and vis_last["raw_detection"]:
            for det in vis_last["raw_detection"]:
                if det.get("name") == "cat":
                    cat_x, cat_y = _centroid(det)
                    break

        # Cat motion between last two frames
        cat_movement_direction = np.nan
        cat_activity_level = np.nan
        cat_distance_change = np.nan
        if len(vis_window) >= 2:
            prev = vis_window.iloc[-2]
            if prev["raw_detection"] and vis_last["raw_detection"]:
                prev_det = next((d for d in prev["raw_detection"] if d.get("name") == "cat"), None)
                curr_det = next((d for d in vis_last["raw_detection"] if d.get("name") == "cat"), None)
                if prev_det and curr_det:
                    prev_x, prev_y = _centroid(prev_det)
                    dx = cat_x - prev_x
                    dy = cat_y - prev_y
                    if abs(dx) > abs(dy):
                        cat_movement_direction = "right" if dx > 0 else "left"
                    else:
                        cat_movement_direction = "down" if dy > 0 else "up"
                    cat_activity_level = "moving" if max(abs(dx), abs(dy)) > 3 else "still"
                    if dy < -2:
                        cat_distance_change = "closer"
                    elif dy > 2:
                        cat_distance_change = "farther"
                    else:
                        cat_distance_change = "no_change"

        rows.append({
            "experience_id"           : fid,
            "timestamp"               : vis_last.get("timestamp", np.nan),
            "is_cat_voice"            : bool(aud_is_cat.any()),
            "is_human_voice"          : bool(aud_is_human.any()),
            "human_voice_sequence"    : human_seq,
            "cat_voice_sequence"      : cat_seq,
            "meow_loudness"           : meow_loudness,
            "cat_detected"            : cat_detected,
            "cat_position_x"          : cat_x,
            "cat_position_y"          : cat_y,
            "cat_movement_direction"  : cat_movement_direction,
            "cat_activity_level"      : cat_activity_level,
            "cat_distance_change"     : cat_distance_change,
            "movement_intensity"      : movement_intensity,
            "cat_interaction_detected": cat_interaction
        })
    return pd.DataFrame(rows)

mrt_experiences = build_mrt_experiences(trans_audio_features,
                                        trans_imu_features,
                                        trans_visual_cat_detection)

In [6]:
mrt_experiences

Unnamed: 0,experience_id,timestamp,is_cat_voice,is_human_voice,human_voice_sequence,cat_voice_sequence,meow_loudness,cat_detected,cat_position_x,cat_position_y,cat_movement_direction,cat_activity_level,cat_distance_change,movement_intensity,cat_interaction_detected
0,0,,,,,,,,,,,,,,
1,1,,,,,,,,,,,,,,
2,2,,,,,,,,,,,,,,
3,3,,,,,,,,,,,,,,
4,4,,,,,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
77,77,2025-07-23 11:20:51.981008,True,True,"[72, 73, 74]",[73],0.0,False,,,,,,1.550708,False
78,78,2025-07-23 11:20:52.195276,True,True,"[72, 73, 74]",[73],0.0,False,,,,,,1.391877,False
79,79,2025-07-23 11:20:52.417412,True,True,"[72, 73, 74]",[73],0.0,False,,,,,,1.347606,False
80,80,2025-07-23 11:20:52.656378,True,True,"[72, 73, 74]",[73],0.0,False,,,,,,1.241126,False
