# Clone Source Code

In [None]:
!git clone https://github.com/hein-nkhh/unilm.git
%cd unilm/beit3

# Import thư viện

In [None]:
!pip install -r requirements.txt

In [None]:
import numpy as np
from tqdm.notebook import tqdm
from PIL import Image
import time
import pickle
import torch
from IPython.display import clear_output
import os
import torch.nn.functional as F
from transformers import AutoTokenizer
from modeling_finetune import beit3_large_patch16_384_retrieval
from PIL import Image
from torchvision import transforms
import matplotlib.pyplot as plt
from tqdm import tqdm
from torch.cuda.amp import autocast
from transformers import XLMRobertaTokenizer
import json
import cv2
from huggingface_hub import HfApi

In [None]:
import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    force=True,                 # ép ghi đè cấu hình cũ (rất quan trọng trong notebook)
)

logger = logging.getLogger("Embedd Frame")
logger.info("Xin chào")

# Cài đặt Device Torch

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hàm extract frames

In [2]:
def extract_frames_with_opencv(
    video_path: str, 
    target_height: int = 27, 
    target_width: int = 48, 
    target_fps: float = None,        # thêm tuỳ chọn fps
    show_progressbar: bool = False
):
    """
    Extracts frames from a video using OpenCV and returns a list of PIL Images.
    If target_fps is set, frames will be sampled to match that FPS.
    """
    logger.info(f"Opening video: {video_path}")
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        logger.error(f"Failed to open video: {video_path}")
        raise ValueError(f"Failed to open video: {video_path}")

    video_fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Nếu có target_fps thì tính step
    if target_fps is not None and target_fps > 0 and video_fps > 0:
        step = int(round(video_fps / target_fps))
        logger.info(f"Video FPS: {video_fps:.2f}, target FPS: {target_fps}, step: {step}")
    else:
        step = 1
        logger.info(f"Video FPS: {video_fps:.2f}, using all frames")

    frames = []

    progress_bar = tqdm(total=total_frames, desc="Extracting frames", unit="frame") if show_progressbar else None

    frame_idx = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_idx % step == 0:   # chỉ lấy frame theo step
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame_resized = cv2.resize(frame_rgb, (target_width, target_height))
            img_pil = Image.fromarray(frame_resized)
            frames.append(img_pil)

        frame_idx += 1
        if progress_bar:
            progress_bar.update(1)

    cap.release()
    if progress_bar:
        progress_bar.close()
    logger.info(f"Extracted {len(frames)} frames (from {total_frames})")
    return frames

# Hàm Extract Embedding

In [None]:
tokenizer = XLMRobertaTokenizer("/kaggle/input/beit3_base_retrieval/pytorch/default/2/beit3.spm")

# Mô hình beit_3
ckpt = "/kaggle/input/beit3_base_retrieval/pytorch/default/2/beit3_large_patch16_384_coco_retrieval.pth"
model = beit3_large_patch16_384_retrieval(pretrained=False)
state_dict = torch.load(ckpt, map_location=device)
model.load_state_dict(state_dict["model"], strict=False)
model = model.to(device)
model.eval()
clear_output()

transform  = transforms.Compose([
    transforms.Resize((384, 384), interpolation=3), 
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5,0.5,0.5], std=[0.5,0.5,0.5])
])

In [None]:
embeddings = []
ids = []

with torch.no_grad():
    start = time.time()
    for image in tqdm(image_paths, desc="🔄 Extracting image embeddings"):
        # image = Image.open(img_path).convert("RGB")
        image_tensor = transform(image).unsqueeze(0).to(device)

        with autocast(): 
            vision_cls, _ = model(image=image_tensor, only_infer=True)
            vision_norm = F.normalize(vision_cls, p=2, dim=-1)

        embeddings.append(vision_norm.squeeze(0).cpu())   # (D,)
        ids.append(img_path)

        del image_tensor, vision_cls, vision_norm
        torch.cuda.empty_cache()
    end = time.time()
    logger.info(f'Thời gian embedd {end-start}')

image_embeddings = torch.stack(embeddings, dim=0)  # (N,D)
torch.save({"embeddings": image_embeddings, "ids": ids}, "image_embeddings.pt")
print("✅ Saved embeddings:", image_embeddings.shape)

# Hàm lưu embeddings

In [None]:
class EmbeddingIO:
    def __init__(self, default_fmt: str = "pkl"):
        """
        Class để lưu / load embeddings.
        Args:
            default_fmt (str): định dạng mặc định ("pkl" hoặc "pt")
        """
        self.default_fmt = default_fmt.lower()

    def save(self, embeddings, file_path: str, fmt: str = None):
        """
        Lưu embeddings ra file pkl hoặc pt.
        Args:
            embeddings: torch.Tensor hoặc numpy.ndarray
            file_path (str): đường dẫn file (không cần đuôi)
            fmt (str): "pkl" hoặc "pt". Nếu None -> dùng default_fmt
        """
        fmt = (fmt or self.default_fmt).lower()
        if fmt == "pkl":
            with open(file_path + ".pkl", "wb") as f:
                pickle.dump(embeddings, f)
            print(f"✅ Saved {file_path}.pkl")
        elif fmt == "pt":
            torch.save(embeddings, file_path + ".pt")
            print(f"✅ Saved {file_path}.pt")
        else:
            raise ValueError("fmt phải là 'pkl' hoặc 'pt'")

    def load(self, file_path: str, fmt: str = None):
        """
        Load embeddings từ file pkl hoặc pt.
        Args:
            file_path (str): đường dẫn file (không cần đuôi)
            fmt (str): "pkl" hoặc "pt". Nếu None -> dùng default_fmt
        Returns:
            torch.Tensor hoặc numpy.ndarray
        """
        fmt = (fmt or self.default_fmt).lower()
        if fmt == "pkl":
            with open(file_path + ".pkl", "rb") as f:
                return pickle.load(f)
        elif fmt == "pt":
            return torch.load(file_path + ".pt")
        else:
            raise ValueError("fmt phải là 'pkl' hoặc 'pt'")

# Hàm Push Embedding

In [None]:
class HFUploader:
    def __init__(self, token: str = None):
        """
        Khởi tạo uploader.
        Nếu có token -> dùng để xác minh.
        """
        self.api = HfApi()
        self.token = token

    def upload_file(self, local_path: str, repo_id: str, path_in_repo: str = None, repo_type: str = "model"):
        """
        Upload một file lên Hugging Face.
        """
        if path_in_repo is None:
            import os
            path_in_repo = os.path.basename(local_path)

        return self.api.upload_file(
            path_or_fileobj=local_path,
            path_in_repo=path_in_repo,
            repo_id=repo_id,
            repo_type=repo_type,
            token=self.token
        )

    def upload_folder(self, local_folder: str, repo_id: str, repo_type: str = "dataset", path_in_repo: str = ""):
        """
        Upload một thư mục lên Hugging Face, có thể chỉ định thư mục con trong repo.
        """
        return self.api.upload_folder(
            folder_path=local_folder,
            path_in_repo=path_in_repo,   # thư mục con trong repo, vd: "data/"
            repo_id=repo_id,
            repo_type=repo_type,
            token=self.token
        )

# Thực thi

In [None]:
image_paths = extract_frames_with_opencv('/kaggle/input/aic-sample-test/videos/L21_V001.mp4',
                                         show_progressbar=True)

In [None]:
io = EmbeddingIO(default_fmt="pkl")

import torch
emb = torch.randn(10, 512)

# Lưu dưới dạng pickle
io.save(emb, "/kaggle/working/L21_V001_embeddings")

# Load lại pickle
loaded_pkl = io.load("/kaggle/working/L21_V001_embeddings")
print(type(loaded_pkl), getattr(loaded_pkl, "shape", None))

# Lưu dưới dạng torch
io.save(emb, "/kaggle/working/L21_V001_embeddings", fmt="pt")

# Load lại torch
loaded_pt = io.load("/kaggle/working/L21_V001_embeddings", fmt="pt")
print(type(loaded_pt), getattr(loaded_pt, "shape", None))