    job_path = os.path.join(ROOT_DIR, JOBS_DIR, job_id)

In [2]:
import os

JOBS_DIR = "jobs"
ROOT_DIR = os.path.dirname(os.getcwd())
ROOT_DIR

'd:\\Learning\\AI_Indonesia\\final_project'

In [3]:
job_id = "testing_0001"

In [4]:
import json
import gpxpy
from typing import List, Dict
from datetime import datetime
import os

def extract_gps(job_id, gps_file: str) -> List[Dict]:
    job_path = os.path.join(ROOT_DIR, JOBS_DIR, job_id)

    if not gps_file.endswith(".gpx"):
        raise ValueError("Please input gpx format file")

    with open(os.path.join(job_path, gps_file), "r") as file:
        print("Open gps file")
        gpx = gpxpy.parse(file)
        print("gps file opened")

    # ------ EXTRACT GPS-------
    points = []

    for track in gpx.tracks:
        for segment in track.segments:
            for point in segment.points:
                points.append({
                    "lat": point.latitude,
                    "lon": point.longitude,
                    "time": point.time  # datetime (UTC)
                })

    # with open("metadata_gps.json", "w") as file:
    #     json.dump(points, file)

    return points


def find_nearest_gps(gps_points, target_time: datetime):
    """
    Cari GPS point dengan waktu terdekat ke target_time
    """
    nearest = None
    min_diff = None

    for p in gps_points:
        diff = abs((p["time"] - target_time).total_seconds())

        if min_diff is None or diff < min_diff:
            min_diff = diff
            nearest = p

    return nearest

In [5]:
gps_file = os.path.join(ROOT_DIR,"gps","sample.gpx")
gps_result = extract_gps(job_id=job_id,gps_file=gps_file)
gps_result

Open gps file
gps file opened


[{'lat': -6.27140491,
  'lon': 106.7323126,
  'time': datetime.datetime(2025, 12, 20, 6, 13, 30, tzinfo=SimpleTZ('Z'))},
 {'lat': -6.2714006,
  'lon': 106.73231419,
  'time': datetime.datetime(2025, 12, 20, 6, 13, 31, tzinfo=SimpleTZ('Z'))},
 {'lat': -6.27140491,
  'lon': 106.73232014,
  'time': datetime.datetime(2025, 12, 20, 6, 13, 32, tzinfo=SimpleTZ('Z'))},
 {'lat': -6.2714055,
  'lon': 106.73231788,
  'time': datetime.datetime(2025, 12, 20, 6, 13, 33, tzinfo=SimpleTZ('Z'))},
 {'lat': -6.27140558,
  'lon': 106.73231796,
  'time': datetime.datetime(2025, 12, 20, 6, 13, 34, tzinfo=SimpleTZ('Z'))},
 {'lat': -6.27139896,
  'lon': 106.73230992,
  'time': datetime.datetime(2025, 12, 20, 6, 13, 35, tzinfo=SimpleTZ('Z'))},
 {'lat': -6.27139368,
  'lon': 106.73230128,
  'time': datetime.datetime(2025, 12, 20, 6, 13, 36, tzinfo=SimpleTZ('Z'))},
 {'lat': -6.27139335,
  'lon': 106.73230103,
  'time': datetime.datetime(2025, 12, 20, 6, 13, 37, tzinfo=SimpleTZ('Z'))},
 {'lat': -6.2713933,
  'lon

In [6]:
import json
from datetime import datetime
import subprocess
import cv2
import os
from datetime import datetime, timedelta


def get_video_creation_time(video_file: str) -> datetime:
    cmd = [
        "ffprobe",
        "-v", "quiet",
        "-show_entries", "format_tags=creation_time",
        "-of", "default=noprint_wrappers=1:nokey=1",
        video_file
    ]

    output = subprocess.check_output(cmd).decode().strip()

    if not output:
        raise ValueError("creation_time not found")

    return datetime.fromisoformat(output.replace("Z", "+00:00"))


def get_frame(job_id: str, video_file: str):
    job_path = os.path.join(ROOT_DIR, JOBS_DIR, job_id)
    frames_path = os.path.join(job_path, "frames")
    video_path = os.path.join(job_path, video_file)

    if not os.path.exists(video_path):
        raise ValueError(f"File {video_path} not found")

    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)

    idx_frame = 0
    saved = 0
    frame_interval = int(fps * 1)

    video_start_time = get_video_creation_time(video_file=video_path)

    metadata = {
        "job_id": job_id,
        "video_created_at": video_start_time,
        "fps": fps,
        "frames": []
    }

    while True:
        ret, frame = cap.read()

        if not ret:
            break

        if idx_frame % frame_interval == 0:
            time_in_video = idx_frame/fps
            saved += 1

            os.makedirs(frames_path, exist_ok=True)
            frame_name = f"frame_{idx_frame:05d}.jpg"
            frame_abs_path = os.path.join(frames_path, frame_name)
            frame_real_path = os.path.join("frames", frame_name)
            cv2.imwrite(frame_abs_path, frame)

            metadata["frames"].append(
                {
                    "frame": idx_frame,
                    "time_in_video": time_in_video,
                    "frame_file": frame_real_path
                }
            )
        idx_frame += 1

    return metadata

In [7]:
video_file = os.path.join(ROOT_DIR, "videos","sample.mp4")
video_frames = get_frame(job_id=job_id, video_file=video_file)
video_frames

{'job_id': 'testing_0001',
 'video_created_at': datetime.datetime(2025, 12, 20, 6, 15, 8, tzinfo=datetime.timezone.utc),
 'fps': 29.993833079221027,
 'frames': [{'frame': 0,
   'time_in_video': 0.0,
   'frame_file': 'frames\\frame_00000.jpg'},
  {'frame': 29,
   'time_in_video': 0.9668654194148487,
   'frame_file': 'frames\\frame_00029.jpg'},
  {'frame': 58,
   'time_in_video': 1.9337308388296974,
   'frame_file': 'frames\\frame_00058.jpg'},
  {'frame': 87,
   'time_in_video': 2.900596258244546,
   'frame_file': 'frames\\frame_00087.jpg'},
  {'frame': 116,
   'time_in_video': 3.867461677659395,
   'frame_file': 'frames\\frame_00116.jpg'},
  {'frame': 145,
   'time_in_video': 4.834327097074244,
   'frame_file': 'frames\\frame_00145.jpg'},
  {'frame': 174,
   'time_in_video': 5.801192516489092,
   'frame_file': 'frames\\frame_00174.jpg'},
  {'frame': 203,
   'time_in_video': 6.768057935903941,
   'frame_file': 'frames\\frame_00203.jpg'},
  {'frame': 232,
   'time_in_video': 7.73492335531

In [12]:
import json
from datetime import datetime
import subprocess
import os
from datetime import datetime, timedelta


def combine_gps_frame(frames_data, gps_data):
    gps_created = gps_data[0].get("time")
    video_created = frames_data.get("video_created_at")

    initial_offset = (
        gps_created - video_created
    ).total_seconds()

    final_metadata = {
        "job_id": frames_data.get("job_id"),
        "video_created": video_created.isoformat(),
        "gps_created": gps_created.isoformat(),
        "data": frames_data['frames']
    }
    print("Creation time gps: ", gps_created)
    print("Creation time video: ", video_created)
    print("Initial offset:", initial_offset)

    for idx, data in enumerate(final_metadata['data']):
        time_in_video = data.get("time_in_video")
        gps_time = video_created + timedelta(
            seconds=time_in_video+initial_offset
        )

        gps_point = find_nearest_gps(gps_data, target_time=gps_time)
        if gps_point is None:
            continue

        data.update(
            {
                "gps_time": gps_time.isoformat(),
                "lat": gps_point["lat"],
                "lon": gps_point["lon"]
            }
        )

    return final_metadata

    # while True:
    #     ret, frame = cap.read()
    #     if not ret:
    #         break

    #     if idx_frame % frame_interval == 0:
    #         time_in_video = idx_frame / fps
    #         gps_time = VIDEO_START_TIME + \
    #             timedelta(seconds=time_in_video+initial_offset)

    #         gps_point = find_nearest_gps(gps_points, gps_time)
    #         if gps_point is None:
    #             idx_frame += 1
    #             continue
    #         saved += 1
    #         os.makedirs(images_path, exist_ok=True)
    #         img_name = f"frame_{idx_frame:05d}.jpg"
    #         img_abs_path = os.path.join(images_path, img_name)
    #         img_rel_path = os.path.join("images", img_name)
    #         cv2.imwrite(img_abs_path, frame)

    #         metadata.append({
    #             "frame": idx_frame,
    #             "video_time": round(time_in_video, 2),
    #             "image_file": img_rel_path,
    #             "gps_time": gps_time.isoformat(),
    #             "lat": gps_point["lat"],
    #             "lon": gps_point["lon"]
    #         })

    #     idx_frame += 1

    # cap.release()

    # with open("metadata.json", "w") as file:
    #     json.dump(metadata, file)

    # return metadata

In [13]:
all_result = combine_gps_frame(frames_data=video_frames, gps_data=gps_result)

Creation time gps:  2025-12-20 06:13:30+00:00
Creation time video:  2025-12-20 06:15:08+00:00
Initial offset: -98.0


In [14]:
all_result

{'job_id': 'testing_0001',
 'video_created': '2025-12-20T06:15:08+00:00',
 'gps_created': '2025-12-20T06:13:30+00:00',
 'data': [{'frame': 0,
   'time_in_video': 0.0,
   'frame_file': 'frames\\frame_00000.jpg',
   'gps_time': '2025-12-20T06:13:30+00:00',
   'lat': -6.27140491,
   'lon': 106.7323126},
  {'frame': 29,
   'time_in_video': 0.9668654194148487,
   'frame_file': 'frames\\frame_00029.jpg',
   'gps_time': '2025-12-20T06:13:30.966865+00:00',
   'lat': -6.2714006,
   'lon': 106.73231419},
  {'frame': 58,
   'time_in_video': 1.9337308388296974,
   'frame_file': 'frames\\frame_00058.jpg',
   'gps_time': '2025-12-20T06:13:31.933731+00:00',
   'lat': -6.27140491,
   'lon': 106.73232014},
  {'frame': 87,
   'time_in_video': 2.900596258244546,
   'frame_file': 'frames\\frame_00087.jpg',
   'gps_time': '2025-12-20T06:13:32.900596+00:00',
   'lat': -6.2714055,
   'lon': 106.73231788},
  {'frame': 116,
   'time_in_video': 3.867461677659395,
   'frame_file': 'frames\\frame_00116.jpg',
   '

In [15]:
with open("sample_metadata.json","w") as file:
    json.dump(all_result, file)

In [23]:
from ultralytics import YOLO

yolo = YOLO("yolo11n-seg.pt")

def predict(job_id,metadata: Dict, batch=10) -> Dict[Dict, List]:
    job_path = os.path.join(ROOT_DIR, JOBS_DIR, job_id)

    batch_images = []
    batch_meta = []

    all_result = []
    if not isinstance(metadata.get("data"), list):
        raise ValueError("There's no frame + gps in metadata")
    else:
        for item in metadata.get("data"):
            img = cv2.imread(os.path.join(job_path, item.get("frame_file")))

            batch_images.append(img)
            batch_meta.append(item)

            if len(batch_images) == batch:
                detection = yolo(batch_images)

                for det, meta in zip(detection, batch_meta):
                    detections_json = []
                    boxes = det.boxes
                    if boxes is not None:
                        for box in boxes:
                            detections_json.append({
                                "class_id": int(box.cls[0]),
                                "confidence": float(box.conf[0]),
                                "bbox": box.xyxy[0].tolist()
                            })
                    all_result.append(
                        {
                            **meta,
                            "detection": detections_json,
                            "total_detect": len(detections_json)
                        }
                    )

                batch_images.clear()
                batch_meta.clear()

        if batch_images:
            detection = yolo(batch_images)

            for det, meta in zip(detection, batch_meta):
                detections_json = []
                boxes = det.boxes
                if boxes is not None:
                    for box in boxes:
                        detections_json.append({
                            "class_id": int(box.cls[0]),
                            "confidence": float(box.conf[0]),
                            "bbox": box.xyxy[0].tolist()
                        })
                all_result.append(
                    {
                        **meta,
                        "detection": detections_json,
                        "total_detect": len(detections_json)
                    }
                )
    metadata['data'] = all_result
    return metadata

In [24]:
final_predict = predict(job_id=job_id,
                        metadata=all_result)


0: 640x640 (no detections), 73.4ms
1: 640x640 1 motorcycle, 73.4ms
2: 640x640 1 person, 73.4ms
3: 640x640 4 motorcycles, 73.4ms
4: 640x640 1 person, 2 motorcycles, 73.4ms
5: 640x640 1 motorcycle, 73.4ms
6: 640x640 1 motorcycle, 1 potted plant, 73.4ms
7: 640x640 1 person, 1 motorcycle, 73.4ms
8: 640x640 2 persons, 1 motorcycle, 73.4ms
9: 640x640 2 persons, 1 motorcycle, 73.4ms
Speed: 4.4ms preprocess, 73.4ms inference, 3.5ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 1 motorcycle, 56.6ms
1: 640x640 (no detections), 56.6ms
2: 640x640 (no detections), 56.6ms
3: 640x640 (no detections), 56.6ms
4: 640x640 1 potted plant, 56.6ms
5: 640x640 1 potted plant, 56.6ms
6: 640x640 1 potted plant, 56.6ms
7: 640x640 1 potted plant, 56.6ms
8: 640x640 1 potted plant, 56.6ms
9: 640x640 2 potted plants, 56.6ms
Speed: 2.9ms preprocess, 56.6ms inference, 0.9ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 car, 2 potted plants, 57.6ms
1: 640x640 1 person, 2 potted 

In [25]:
final_predict

{'job_id': 'testing_0001',
 'video_created': '2025-12-20T06:15:08+00:00',
 'gps_created': '2025-12-20T06:13:30+00:00',
 'data': [{'frame': 0,
   'time_in_video': 0.0,
   'frame_file': 'frames\\frame_00000.jpg',
   'gps_time': '2025-12-20T06:13:30+00:00',
   'lat': -6.27140491,
   'lon': 106.7323126,
   'detection': [],
   'total_detect': 0},
  {'frame': 29,
   'time_in_video': 0.9668654194148487,
   'frame_file': 'frames\\frame_00029.jpg',
   'gps_time': '2025-12-20T06:13:30.966865+00:00',
   'lat': -6.2714006,
   'lon': 106.73231419,
   'detection': [{'class_id': 3,
     'confidence': 0.7983698844909668,
     'bbox': [466.863525390625,
      0.2727184295654297,
      608.6868896484375,
      138.73092651367188]}],
   'total_detect': 1},
  {'frame': 58,
   'time_in_video': 1.9337308388296974,
   'frame_file': 'frames\\frame_00058.jpg',
   'gps_time': '2025-12-20T06:13:31.933731+00:00',
   'lat': -6.27140491,
   'lon': 106.73232014,
   'detection': [{'class_id': 0,
     'confidence': 0.