In [None]:
"""
Qwen format:

[
    {
      "image": "demo/images/10095.png",
      "conversations": [
        {
          "from": "human",
          "value": "Is the value of Favorable 38 in 2015?\n<image>"
        },
        {
          "from": "gpt",
          "value": "Yes"
        }
      ]
    },
    ...
]
"""

"""
template_obj = (text_temp, obj_mapper, obj_filter)
"""

In [None]:
import os
import json
import random
from dataclasses import dataclass

DS_ROOT = "./structured-data"
IMG_NAMING = "CAM_FRONT_raw.jpg"
META_NAMING = "CAM_FRONT_meta.json"

@dataclass
class QA:
    Q: str
    A: str
    objs: list[str]
    ts: list[str]
    scene: str
    ds: "QADataset"
    QA_type: str
    def __str__(self):
        return f"Q: {self.Q}\nA: {self.A}\nobjs: {self.objs}\ntimestamps: {self.ts}\nscene: {self.scene}\nQA_type: {self.QA_type}"

    def qwen_format(self, preview: bool = False):
        images = ds[self.scene].frames_imgs
        return {
            "images": images if not preview else (images[:1] + ["..."]),
            "conversations": [
                {
                    "from": "human",
                    "value": f"{self.Q}" + '\n<image>' * len(images)
                },
                {
                    "from": "gpt",
                    "value": self.A
                }
            ]
        }

class QAScene:
    def objs_of_frame(self, frame_meta, obj_filter):
        """
        obj_filter: a function that takes in a frame_meta and returns a list of objects
        """
        objs = []
        for anno in frame_meta['annos']:
            if obj_filter(anno):
                new_obj = (anno['instance_token'], anno['category_name'])
                objs.append(new_obj)
        return objs
    def __init__(self, ds, sc_root: str):
        self.ds = ds
        self.root = sc_root
        frames = os.listdir(sc_root)
        frames.sort()
        self.frames_imgs = []
        self.frames_metas = []
        self.objs = set()

        for i, frame in enumerate(frames):
            rel_path = os.path.join(sc_root, frame, IMG_NAMING)
            abs_path = os.path.abspath(rel_path)
            self.frames_imgs.append(abs_path)
            js_file = os.path.join(sc_root, frame, META_NAMING)
            with open(js_file, "r") as f:
                meta = json.load(f)
            meta['timestamp_idx'] = i
            self.frames_metas.append(meta)
            objs_in_frame = self.objs_of_frame(meta, lambda x: True)
            self.objs.update(objs_in_frame)
        
        self.objs = [
            {"instance_token": obj[0], "category_name": obj[1]} for obj in self.objs]
    def __getitem__(self, timestamp: str):
        for frame in self.frames_metas:
            if frame['timestamp'] == timestamp:
                return frame
        raise ValueError(f"Frame {timestamp} not found in scene {self.root}")
    def __len__(self):
        return len(self.frames_metas)
        

class QADataset:
    def __init__(self, ds_root: str):
        self.root = ds_root
        self.scenes = os.listdir(ds_root)
        self.scenes = [QAScene(self, os.path.join(ds_root, scene)) for scene in self.scenes]
        print(f"Found {len(self.scenes)} scenes in {ds_root}")
    def __getitem__(self, scene_token: str):
        for scene in self.scenes:
            if scene.root == scene_token:
                return scene
        raise ValueError(f"Scene {scene_token} not found in dataset")
    def __len__(self):
        return len(self.scenes)

class QATemplate:
    QA_SPLITTER = "<QASPLITTER>"
    def __init__(self, Q_temp: str, A_temp: str,  obj_mappers: list, obj_filter, config):
        self.QA_temp = f"{Q_temp}{self.QA_SPLITTER}{A_temp}"
        self.obj_mapper = obj_mappers
        self.obj_filter = obj_filter
        self.cfg = config

    def __call__(self, scene: QAScene, verbose: bool = False):
        # prepare resources
        num_objs = self.cfg["num_objs"]
        num_frames = self.cfg["num_frames"]

        # TODO change them to rational selection!
        frames = random.sample(scene.frames_metas, num_frames)
        objs = set()
        for frame in frames:
            objs_in_frame = scene.objs_of_frame(frame, self.obj_filter)
            objs.update(objs_in_frame)
        objs = [{"instance_token": obj[0], "category_name": obj[1]} for obj in objs]
        objs = random.sample(objs, num_objs)

        final_QA = self.QA_temp
        for kw, fn in self.obj_mapper:
            keyword = f"<{kw}>"
            assert keyword in final_QA, f"Keyword {keyword} not found in template {self.QA_temp}"
            final_QA = final_QA.replace(keyword, fn(frames, objs), 1)
        
        if verbose:
            print(f"scene: {scene.root}")
            print(f"frames: {[frame['timestamp'] for frame in frames]}")
            print(f"objs: {[obj['instance_token'] for obj in objs]}")

        Q = final_QA.split(self.QA_SPLITTER)[0]
        A = final_QA.split(self.QA_SPLITTER)[1]

        return QA(
            Q=Q,
            A=A,
            objs=[obj['instance_token'] for obj in objs],
            ts=[frame['timestamp'] for frame in frames],
            scene=scene.root,
            ds=scene.ds,
            QA_type=self.cfg["QA_type"]
        )

random.seed()
ds = QADataset(DS_ROOT)

In [None]:
# function pool here
from templates_lib.filter import (
    filter_area, 
    filter_visiblity, 
    filter_multiple_fn
)
from templates_lib.func import (
    obj_desc_fn,
    frame_ts_fn,
    obj_cam_dist,
    obj_dist_between
)

### Static::Measurement::object_distance

In [None]:
single_obj_abs_dist = QATemplate(
    Q_temp="What is the distance between <obj> and the ego camera at frame <frame>? (return in meters, frame idx starts from 0).",
    A_temp="<abs_dist>",
    obj_mappers=[
        ("obj", obj_desc_fn(0)),
        ("frame", frame_ts_fn(0)),
        ("abs_dist", obj_cam_dist),
    ],
    obj_filter=filter_visiblity,
    config={
        "num_objs": 1,
        "num_frames": 1,
        "QA_type": "single_obj_abs_dist"
        },
)

double_obj_abs_dist = QATemplate(
    Q_temp="What is the distance between <obj1> and <obj2> at frame <frame>? (return in meters, frame idx starts from 0).",
    A_temp="<abs_dist>",
    obj_mappers=[
        ("obj1", obj_desc_fn(0)),
        ("obj2", obj_desc_fn(1)),
        ("frame", frame_ts_fn(0)),
        ("abs_dist", obj_dist_between),
    ],
    obj_filter=filter_visiblity,
    config={
        "num_objs": 2,
        "num_frames": 1,
        "QA_type": "double_obj_abs_dist"
    },
)

sc = random.choice(ds.scenes)
print(json.dumps(single_obj_abs_dist(sc).qwen_format(preview=True), indent=2))
print(json.dumps(double_obj_abs_dist(sc).qwen_format(preview=True), indent=2))