In [None]:
import json
from pathlib import Path

p_caption_json = Path('/data/gunsbrother/prjs/ltvu/llms/Video-LLaVA/ltvu/captions/test/step2-3/0aca0078-b6ab-41fb-9dc5-a70b8ad137b2/9e5cd376-1b29-5861-8115-be750272d0a9.json')
with p_caption_json.open() as f:
    data = json.load(f)

In [None]:
data['step3']['debators'][1]['debator']['conv']['messages']

In [None]:
import re
def find_floats(s: str) -> float|tuple[float]:
    floats = re.findall(r'\d*\.?\d+', s)
    try:
        floats = list(map(float, floats))
    except ValueError as e:
        print(e)
    if len(floats) == 1:
        return floats[0]
    else:
        return floats

In [None]:
final_answer = data['step3']['debators'][0]['debator']['conv']['messages'][-1][-1]
final_prediction = find_floats(final_answer)
if len(final_prediction) == 2:
    s, e = final_prediction
else:
    pass  # keep the first prediction

# Windowing

`duration, gt_start_sec, gt_end_sec` $\rightarrow$ `window_start_sec, window_end_sec, gt_window_start_sec, gt_window_end_sec`

In [2]:
from typing import Union, Sequence
from functools import total_ordering

import numpy as np


TimestampType = Union['Timestamp', int, float]
IntervalType = Union['Interval', Sequence[TimestampType], np.ndarray[int|float]]


def ensure_timestamp_like(t):
    if not isinstance(t, (int, float, Timestamp)):
        raise TypeError(f'Unsupported type for Timestamp: {type(t).__name__}')


@total_ordering  # defines other comparison methods, may be slow
class Timestamp:
    def __init__(self,
        sec_or_idx: TimestampType,
        idxs_per_sec: float = 30,  # [ FPS ] or [ total frames(or features) / total seconds ]
    ):
        ensure_timestamp_like(sec_or_idx)
        self.idxs_per_sec = idxs_per_sec
        if isinstance(sec_or_idx, int):
            self.idx = sec_or_idx
        elif isinstance(sec_or_idx, float):
            self.sec = sec_or_idx
        elif isinstance(sec_or_idx, Timestamp):
            self.idxs_per_sec = sec_or_idx.idxs_per_sec
            self.sec = sec_or_idx.sec

    @property
    def idx(self):
        return self._idx
    @property
    def sec(self):
        return self._sec
    @idx.setter
    def idx(self, idx):
        self._idx = idx
        self._sec = idx / self.idxs_per_sec
    @sec.setter
    def sec(self, sec):
        self._sec = sec
        self._idx = int(round(sec * self.idxs_per_sec, 0))

    def __str__(self):
        return f'[{self.idx} = {self.sec:.2f}s * {self.idxs_per_sec:.2f} idxs/s]'
    def __repr__(self):
        return f'Timestamp({self.sec:.2f}s, {self.idxs_per_sec:.2f})'
    def __index__(self):
        return self.idx
    def __float__(self):
        return self.sec
    def __int__(self):
        return self.idx

    def __eq__(self, other: TimestampType):
        other = self._as_timestamp_if_valid(other)
        return self.idx == other.idx
    def __lt__(self, other: TimestampType):
        other = self._as_timestamp_if_valid(other)
        return self.idx < other.idx

    def __add__(self, other: TimestampType):
        other = self._as_timestamp_if_valid(other)
        return Timestamp(self.sec + other.sec, self.idxs_per_sec)
    def __radd__(self, other: TimestampType):
        return self.__add__(other)
    def __iadd__(self, other: TimestampType):
        other = self._as_timestamp_if_valid(other)
        self.sec += other.sec
        return self

    def __mul__(self, other: TimestampType):
        if isinstance(other, int|float):
            return Timestamp(self.sec * other, self.idxs_per_sec)
        elif isinstance(other, Timestamp):
            raise ValueError(f'Not allowed to multiply two Timestamps.')
    def __rmul__(self, other: TimestampType):
        return self.__mul__(other)
    def __imul__(self, other: TimestampType):
        if isinstance(other, int|float):
            self.sec *= other
        elif isinstance(other, Timestamp):
            raise ValueError(f'Not allowed to multiply two Timestamps.')
        return self

    def __array__(self, dtype=None):
        if dtype is None or np.issubdtype(dtype, np.floating):
            return np.asarray(self.sec)
        if dtype is not None and np.issubdtype(dtype, np.integer):
            return np.asarray(self.idx)
        raise ValueError(f'Unsupported dtype: {dtype}')

    def _as_timestamp_if_valid(self, other: TimestampType):
        other = Timestamp(other)
        self._check_fps(other)
        return other
    def _check_fps(self, other: 'Timestamp'):
        if not np.isclose(self.idxs_per_sec, other.idxs_per_sec):
            raise ValueError(f'Inconsistent fps: {self.idxs_per_sec} vs {other.idxs_per_sec}')


class Interval:
    def __init__(self,
        s: TimestampType|IntervalType,
        e: TimestampType|None = None
    ):
        if e is None:
            if isinstance(s, Sequence):
                s, e = s
            elif isinstance(s, Interval):
                s, e = s.s, s.e
        self.s = Timestamp(s)
        self.e = Timestamp(e)
        if self.s > self.e:
            raise ValueError('The start time should be less than the end time.')

    def iou(self, others: IntervalType|Sequence[IntervalType]):
        """intersection over union"""
        others = np.array(others, dtype=float)
        if others.ndim == 1:
            others = others.reshape(1, -1)
        elif others.ndim != 2:
            raise ValueError(f'Unsupported shape for others: {others.shape}')
        s = np.maximum(self.s.sec, others[:, 0])
        e = np.minimum(self.e.sec, others[:, 1])
        i = np.maximum(0, e - s)
        u = np.maximum(0, np.minimum(self.e.sec, others[:, 1]) - np.maximum(self.s.sec, others[:, 0]))

        return i / u

    def __str__(self) -> str:
        return f'[{self.s.sec:.2f}s, {self.e.sec:.2f}s]'
    def __repr__(self) -> str:
        return f'Interval({repr(self.s)}, {repr(self.e)})'

    def __matmul__(self, others: Sequence[IntervalType]):  # @ operator
        return self.iou(others)
    def __imatmul__(self, others: Sequence[IntervalType]):  # @= operator
        return self.iou(others)
    def __rmatmul__(self, others: Sequence[IntervalType]):  # @ operator
        return self.iou(others)

    def __and__(self, other: IntervalType):
        """intersection"""
        if isinstance(other, list):
            other = Interval(other)
        if self.s > other.e or self.e < other.s:
            return 0
        return Interval(max(self.s, other.s), min(self.e, other.e))
    def __or__(self, other: IntervalType):
        """union"""
        if isinstance(other, list):
            other = Interval(other[0], other[1])
        if self.s.sec > other.e.sec or self.e.sec < other.s.sec:
            return 0
        return Interval(min(self.s.sec, other.s.sec), max(self.e.sec, other.e.sec))

    def __array__(self, dtype=None):
        return np.array([self.s, self.e], dtype=dtype)
    def __len__(self):
        return 2
    def __getitem__(self, idx):
        if idx not in [0, 1]:
            raise IndexError(f'Index out of range: {idx}')
        return [self.s, self.e][idx]

    def _as_interval_if_valid(self, other: IntervalType):
        other = Interval(other)
        self._check_fps(other)
        return other
    def _check_fps(self, other: 'Interval'):
        if not np.isclose(self.s.idxs_per_sec, other.s.idxs_per_sec):
            raise ValueError(f'Inconsistent fps: {self.s.idxs_per_sec} vs {other.s.idxs_per_sec}')


from rich.console import Console
from rich.syntax import Syntax

def print_highlighted_expr_and_output(exprs):
    console = Console()
    results = []
    for expr in exprs.strip().split('\n'):
        if expr.startswith('#'):
            continue
        result = f'>>> {expr}\n{eval(expr)}'
        results.append(result)
    results = '\n\n'.join(results)
    syntax = Syntax(results, "python", theme="gruvbox-dark", line_numbers=False)
    console.print(syntax)


t1 = Timestamp(360. - 1e-7, 30)  # 360 seconds in 30 fps
exprs = """\
t1
t1.idx
np.arange(14400)[t1]
np.array(t1)
np.array([t1]*10)
np.array([t1]*10, dtype=int)
Timestamp(t1)"""
print_highlighted_expr_and_output(exprs)

t2 = Timestamp(450, 14400/480)  # 450th frame in 480 seconds
exprs = """\
t2
t2.idx
t2.sec
np.arange(14400)[t2]
t2 + 1
1 + t2
t2 * 2
t2 * 2.
t2 == 450
"""
print_highlighted_expr_and_output(exprs)

num_features = 897
duration_sec = 480
features_per_second = num_features / duration_sec

s, e = Timestamp(450, features_per_second), Timestamp(600, features_per_second)
intval0 = Interval(s, e)
intval1 = [Timestamp(500, features_per_second), Timestamp(715, features_per_second)]
intval1 = Interval(intval1)
exprs = """\
intval0
intval1
intval0.s, intval0.e
intval0.iou(intval1)
intval0 @ intval1
intval0 @ [intval1, [500, 600]]
intval0 & intval1
intval0 | intval1
np.array(intval0)
np.array([Interval(i, i+1) for i in range(10)])
"""
print_highlighted_expr_and_output(exprs)


  return i / u


In [None]:
def sample_with_window(z_vid, s_ind, e_ind, L_window=64, max_window_off_ratio=2/3, base_length=256):
    """
    z_vid: [L_feat, D], a tensor of features of a single video, L_feat may be within 897 ~ 900.
    s_ind, e_ind: feature indices within [0, 256) of the GT interval, both included.
    L_window: The window size.
    max_window_off_ratio: The max ratio w.r.t the window size of the extent that a window can get out of the GT interval.
    """
    L_feat = z_vid.shape[0]
    s_ind_orig, e_ind_orig = s_ind, e_ind
    s_ind, e_ind = int(s_ind/base_length*L_feat), int(e_ind/base_length*L_feat)
    leftmost = s_ind - int(max_window_off_ratio*L_window)
    rightmost = e_ind + int(max_window_off_ratio*L_window) - L_window + 1
    s_wind_ind = np.random.randint(leftmost, rightmost+1)
    s_wind_ind = min(L_feat - L_window, max(0, s_wind_ind))
    e_wind_ind = s_wind_ind + L_window - 1
    z_vid_resampled = z_vid[s_wind_ind:e_wind_ind+1]
    assert z_vid_resampled.shape[0] == L_window, f'{s_wind_ind}, {e_wind_ind}, {z_vid_resampled.shape}'
    new_s_ind = max(s_ind - s_wind_ind, 0)
    new_e_ind = min(e_ind - s_wind_ind, L_window - 1)
    # print(
    #     f'{L_feat}\n'  # 897 ~ 900
    #     f'{s_ind_orig:4d} {e_ind_orig}\n'  # 0 ~ 255
    #     f'{s_ind:4d} {e_ind}\n'
    #     f'{s_wind_ind:4d} {e_wind_ind}\n'
    #     f'{new_s_ind:4d} {new_e_ind}\n'  # 0 ~ L_window-1
    # )
    assert 0 <= new_s_ind <= new_e_ind < L_window
    return z_vid_resampled, new_s_ind, new_e_ind, s_wind_ind, e_wind_ind
