In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import io
import datetime
from os import path, makedirs

import matplotlib.pyplot as plt
import contextily as cx
from PIL import Image, ImageDraw, ImageOps

from fit2png.common import SEMICIRCLES_TO_DEGREES
from fit2png.utils import read_fit, render_hud, computer_vision_hud

## Configurations

In [None]:
MAX_TAILS = 30
MAX_HR = 172
GEOPY_CALL_INTERVAL = 1
HUD_WAIT_FOR_GEOPY = True
FIT_FILENAME = 'input/20260223214525.fit'
VIDEO_PATHS = [
    "/path/to/video.mp4",
]
MODEL_BBOX = "models/yolo26x.pt"
MODEL_SEG = "models/yolo26x-seg.pt"

fit_id = path.splitext(path.basename(FIT_FILENAME))[0]
HUD_OUTDIR = path.join('rendered', fit_id, 'hud')
HUD_REDACTED_OUTDIR = path.join('rendered', fit_id, 'hud_redacted')
MINIMAP_OUTDIR = path.join('rendered', fit_id, 'minimap')
BBOX_OUTDIR = path.join('rendered', fit_id, 'bbox')
SEG_OUTDIR = path.join('rendered', fit_id, 'seg')
LABEL_OUTDIR = path.join('rendered', fit_id, 'label')
AUDIOMETER_OUTDIR = path.join('rendered', fit_id, 'audiometer')

GOLDEN_RATIO_CONJUGATE = 0.618033988749895

## Parse FIT file

In [None]:
data = read_fit(FIT_FILENAME)
print(data['file_id_mesgs'][0])

## HUD redacted

In [None]:
# makedirs(HUD_REDACTED_OUTDIR, exist_ok=True)
# render_hud(data, MAX_HR, HUD_REDACTED_OUTDIR, geopy_call_interval=GEOPY_CALL_INTERVAL, enforce_privacy=True)

## HUD without redacted

In [None]:
makedirs(HUD_OUTDIR, exist_ok=True)
render_hud(data, MAX_HR, HUD_OUTDIR, geopy_call_interval=GEOPY_CALL_INTERVAL, wait_for_geopy=HUD_WAIT_FOR_GEOPY, enforce_privacy=False)

## Minimap

In [None]:
def apply_levels(image, black_point, gamma, white_point):
    """Applies GIMP-like levels adjustment to an RGB/RGBA image."""
    # Create a lookup table for the adjustment
    lut = []
    for i in range(256):
        # Clamp and normalize input
        v = min(max(i, black_point), white_point)
        normalized = (v - black_point) / (white_point - black_point)
        # Apply gamma and scale back to 0-255
        res = pow(normalized, 1.0 / gamma) * 255
        lut.append(int(res))

    # Apply to RGB channels only (preserving Alpha if present)
    if image.mode == 'RGBA':
        r, g, b, a = image.split()
        r = r.point(lut)
        g = g.point(lut)
        b = b.point(lut)
        return Image.merge('RGBA', (r, g, b, a))
    return image.point(lut)

diff_time = 0
frame_counter = 0
fig, ax = plt.subplots(figsize=(5, 5), frameon=False)

makedirs(MINIMAP_OUTDIR, exist_ok=True)

for current_coord_index in range(0, len(data['record_mesgs'])):
    ax.clear()
    ax.set_axis_off()
    for i in range(max(0, current_coord_index - MAX_TAILS), current_coord_index + 1):
        x = data['record_mesgs'][i]

        pos_lat = x.get('position_lat', None)
        pos_long = x.get('position_long', None)
        if pos_lat is not None and pos_long is not None:
            pos_lat = x['position_lat'] * SEMICIRCLES_TO_DEGREES
            pos_long = x['position_long'] * SEMICIRCLES_TO_DEGREES

            # Lower margin, higher the zoom. Also crops the basemap to reduce bandwidth
            margin = 0.001
            ax.set_xlim(pos_long - margin, pos_long + margin)
            ax.set_ylim(pos_lat - margin, pos_lat + margin)

            # Add the basemap but force a LOWER zoom level (e.g., 15 or 16)
            # Standard street level is 18. By forcing 16, the labels will appear 4x larger.
            cx.add_basemap(ax,
                   crs='EPSG:4326',
                   source=cx.providers.OpenStreetMap.Mapnik,
                   zoom=19, # Lower zoom = More detail
                   attribution="")

            cx.add_basemap(ax,
                   crs='EPSG:4326',
                   source=cx.providers.Esri.WorldImagery,
                   zoom=19,
                   alpha=0.1,
                   attribution="")

            # Fix copyright (attribution) text color
            if ax.texts:
                attribution_text = ax.texts[-1]
                attribution_text.set_color('black')

            if i < current_coord_index:
                # Trails
                ax.plot(pos_long, pos_lat, 'bo', markersize=15 - (((current_coord_index+1) - i)*0.5), markeredgecolor='white')
            else:
                # Head
                ax.plot(pos_long, pos_lat, 'ro', markersize=15, markeredgecolor='white')

    # 1. Convert Figure to PIL Image
    buf = io.BytesIO()
    # Use bbox_inches='tight', pad_inches=0 to avoid extra white space
    fig.savefig(buf, format='png', transparent=True, bbox_inches='tight', pad_inches=0)
    buf.seek(0)
    img = Image.open(buf).convert("RGBA")

    # Apply GIMP Levels: (low, gamma, high)
    img = apply_levels(img, 146, 0.5, 255)

    # 2. Create a circular mask
    mask = Image.new('L', img.size, 0)
    draw = ImageDraw.Draw(mask)
    # Draw a white circle (255) on the black background (0)
    # draw.ellipse((0, 0) + img.size, fill=255)
    draw.rectangle((0, 0, img.size[0], img.size[1]), fill=255)

    # 3. Apply the mask
    output = ImageOps.fit(img, mask.size, centering=(0.5, 0.5))
    output.putalpha(mask)

    # # 4. Draw a medium thick circle border
    # draw_output = ImageDraw.Draw(output)
    # border_width = 2
    # draw_output.ellipse(
    #         (0, 0, output.size[0], output.size[1]),
    #         outline="black",
    #         width=border_width
    #     )

    # 5. Save the processed image
    output.save(path.join(MINIMAP_OUTDIR, f'{frame_counter:05d}.png'))
    frame_counter += 1

    current_x = data['record_mesgs'][current_coord_index]
    next_x = data['record_mesgs'][current_coord_index+1] if current_coord_index < len(data['record_mesgs']) - 1 else None
    current_timestamp = current_x['timestamp'].astimezone(datetime.timezone(datetime.timedelta(hours=8)))
    upcoming_time = next_x['timestamp'].astimezone(datetime.timezone(datetime.timedelta(hours=8))) if next_x is not None else None
    diff_time = (upcoming_time - current_timestamp).total_seconds() if upcoming_time is not None else 0
    if diff_time > 1:
        # Pad idle frames for easier Video Editing
        for i in range(1, int(diff_time)):
            output.save(path.join(MINIMAP_OUTDIR, f'{frame_counter:05d}.png'))
            frame_counter += 1

    plt.close(fig)

# ComputerVision HUD

In [None]:
makedirs(BBOX_OUTDIR, exist_ok=True)
makedirs(SEG_OUTDIR, exist_ok=True)
makedirs(LABEL_OUTDIR, exist_ok=True)

computer_vision_hud(VIDEO_PATHS, MODEL_BBOX, MODEL_SEG, BBOX_OUTDIR, SEG_OUTDIR, LABEL_OUTDIR)

# Audio Loudness Meter HUD

In [None]:
from pathlib import Path
import numpy as np
from matplotlib import font_manager as fm
import cv2
import av

FLOOR_DB = -36.0
CEIL_DB = 3.0
makedirs(AUDIOMETER_OUTDIR, exist_ok=True)

def rms_dbfs(samples: np.ndarray, eps: float = 1e-12) -> float:
    samples = np.asarray(samples, dtype=np.float32)
    if samples.size == 0:
        return float("-inf")
    rms = np.sqrt(np.mean(samples * samples) + eps)
    return 20.0 * np.log10(rms + eps)

def frame_loudness_lr(audio_lr: np.ndarray, i0: int, i1: int) -> tuple[float, float] | tuple[float, None]:
    """
    audio_lr shape: (channels, samples)
    Returns (L_dBFS, R_dBFS) when 2+ channels, else (mono_dBFS, None)
    """
    chunk = audio_lr[:, i0:i1]
    if chunk.shape[1] == 0:
        return float("-inf"), (float("-inf") if audio_lr.shape[0] >= 2 else None)

    if audio_lr.shape[0] >= 2:
        l = rms_dbfs(chunk[0])
        r = rms_dbfs(chunk[1])
        return l, r
    else:
        mono = rms_dbfs(chunk[0])
        return mono, None

def peak_dbfs(samples: np.ndarray, eps: float = 1e-12) -> float:
    """
    Sample-peak level in dBFS (NOT RMS).
    0 dBFS == full scale (abs(sample) == 1.0 for float PCM).
    """
    samples = np.asarray(samples, dtype=np.float32)
    if samples.size == 0:
        return float("-inf")
    peak = float(np.max(np.abs(samples)))
    return 20.0 * np.log10(max(peak, eps))

def frame_volume_lr_peak(audio_lr: np.ndarray, i0: int, i1: int) -> tuple[float, float] | tuple[float, None]:
    """
    audio_lr shape: (channels, samples)
    Returns (L_peak_dBFS, R_peak_dBFS) when 2+ channels, else (mono_peak_dBFS, None)
    """
    chunk = audio_lr[:, i0:i1]
    if chunk.shape[1] == 0:
        return float("-inf"), (float("-inf") if audio_lr.shape[0] >= 2 else None)

    if audio_lr.shape[0] >= 2:
        l = peak_dbfs(chunk[0])
        r = peak_dbfs(chunk[1])
        return l, r
    else:
        mono = peak_dbfs(chunk[0])
        return mono, None

frame_idx = 0
for video_path in VIDEO_PATHS:
    # Video via OpenCV (frames)
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    if not fps or fps <= 0:
        raise RuntimeError(f"Could not read FPS from video: {video_path}")
    frame_duration = 1.0 / fps

    # Audio via PyAV (demux/decode)
    container = av.open(video_path)

    audio_stream = next((s for s in container.streams if s.type == "audio"), None)
    if audio_stream is None:
        raise RuntimeError("No audio stream found in container")

    # Define sample rate ONCE, from the stream (robust + simple)
    if getattr(audio_stream, "rate", None) is None:
        raise RuntimeError("Audio stream has no sample rate (audio_stream.rate is None)")
    sr = int(audio_stream.rate)

    audio_frames_lr: list[np.ndarray] = []

    for audio_frame in container.decode(audio_stream):
        arr = audio_frame.to_ndarray()

        # Normalize to shape (channels, samples)
        if arr.ndim == 1:
            # mono interleaved -> (1, samples)
            arr = arr[np.newaxis, :]
        elif arr.ndim == 2:
            # could already be (channels, samples) (planar)
            # if it's (samples, channels) swap it
            if arr.shape[0] > arr.shape[1]:
                # heuristic: more samples than channels -> (samples, channels)
                # so transpose to (channels, samples)
                arr = arr.T
        else:
            raise RuntimeError(f"Unexpected audio ndarray shape: {arr.shape}")

        audio_frames_lr.append(arr.astype(np.float32, copy=False))

    audio_lr = np.concatenate(audio_frames_lr, axis=1) if audio_frames_lr else np.zeros((1, 0), dtype=np.float32)
    channels = audio_lr.shape[0]


    # Initialize plot
    font_size = 10
    font_path = Path("~/.local/share/fonts/google/roboto_mono/RobotoMono-VariableFont_wght.ttf").expanduser()
    mono_fp = fm.FontProperties(fname=font_path, size=font_size)

    xmin = 10 ** (FLOOR_DB / 20.0)  # amplitude at -60 dBFS
    xmax = 10 ** (CEIL_DB / 20.0)   # amplitude at 0 dBFS = 1.0

    fig, ax = plt.subplots(figsize=(2.7, 0.8), dpi=150)

    bars = ax.barh(
        ["CH2", "CH1"],
        [0.0, 0.0],
        left=xmin,
        color="white",
        edgecolor="white",
        alpha=1.0,
        height=0.6,
    )

    ax.set_xscale("log")
    ax.set_xlim(xmin, xmax)
    # ax.set_xlabel("Amplitude", fontproperties=mono_fp)
    # ax.set_title("Loudness", fontproperties=mono_fp)

    # dB tick labels on the X axis
    # [-60, -56, -52, -48, -44, -40, -36, -32, -28, -24, -20, -16, -12, -8, -4, 0]
    # [-60, -48, -36, -24, -12, -6, 0]
    ticks_db = np.array([-36, -12, 0], dtype=float)
    ax.set_xticks(10 ** (ticks_db / 20.0))
    ax.set_xticklabels([f"{int(t)} dB" for t in ticks_db])

    ax.grid(True, which="both", axis="x", alpha=0.3)

    txt = fig.text(
        0.2, 0.98, "",
        ha="left", va="top",
        color="white",
        fontproperties=mono_fp,
    )

    for label in ax.get_xticklabels() + ax.get_yticklabels():
        label.set_fontproperties(mono_fp)

    fig.subplots_adjust(top=1.20)   # leaves room for the text above the axes
    fig.tight_layout()


    while True:
        ok, frame_bgr = cap.read()
        if not ok:
            break

        t0 = frame_idx * frame_duration
        t1 = (frame_idx + 1) * frame_duration

        i0 = int(t0 * sr)
        i1 = int(t1 * sr)

        # l_dbfs, r_dbfs = frame_loudness_lr(audio_lr, i0, i1)
        l_dbfs, r_dbfs = frame_volume_lr_peak(audio_lr, i0, i1)


        # Plot loudness meter
        l_val = float(l_dbfs) if (l_dbfs is not None and np.isfinite(l_dbfs)) else FLOOR_DB
        r_val = float(r_dbfs) if (r_dbfs is not None and np.isfinite(r_dbfs)) else np.nan

        l_plot_db = float(np.clip(l_val, FLOOR_DB, CEIL_DB))
        r_plot_db = FLOOR_DB if not np.isfinite(r_val) else float(np.clip(r_val, FLOOR_DB, CEIL_DB))

        l_amp = 10 ** (l_plot_db / 20.0)
        r_amp = 10 ** (r_plot_db / 20.0)

        # Baseline stays at xmin (=-60 dBFS in amplitude)
        bars[0].set_x(xmin)
        bars[1].set_x(xmin)

        # Width is offset from baseline (amplitude units)
        bars[0].set_width(r_amp - xmin)
        bars[1].set_width(l_amp - xmin)

        # bars[1].set_alpha(0.25 if not np.isfinite(r_val) else 1.0)

        txt.set_text(f"{l_plot_db:5.1f} dB       {'â€”' if not np.isfinite(r_val) else f'{r_plot_db:5.1f} dB'}")
        # txt.set_text(f"{r_plot_db:5.1f} dB       {r_plot_db:5.1f} dB")

        out_png = path.join(AUDIOMETER_OUTDIR, f"{frame_idx:07d}.png")
        fig.savefig(out_png, format="png", transparent=True)


        frame_idx += 1
        plt.close(fig)

    cap.release()
    container.close()