#### **Import Libraries**

In [None]:
import os
import json
import cv2
import torch
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from ultralytics import YOLO
from torchvision.ops import box_iou

In [None]:
%cd D:/AIC2024/dataset

#### **Parsing Data Path**

In [None]:
def parse_data_path(feature_dir='./keyframe'):
    all_feature_paths = dict()
    for feature_part in sorted(os.listdir(feature_dir)):
        all_feature_paths[feature_part] = dict()
    for feature_part in sorted(all_feature_paths.keys()):
        feature_part_path = f'{feature_dir}/{feature_part}'
        feature_paths = sorted(os.listdir(feature_part_path))
        feature_ids = [feature_path.split('.')[0] for feature_path in feature_paths]
        for feature_id, feature_path in zip(feature_ids, feature_paths):
            feature_path_full = f'{feature_part_path}/{feature_path}'
            all_feature_paths[feature_part][feature_id] = feature_path_full
    return all_feature_paths

In [None]:
keyframe_dir='./distilled_keyframe'
all_keyframe_paths = parse_data_path(feature_dir=keyframe_dir)

#### **Object Textual Encoder**

In [1]:
default_object_classes = [
    # Phương tiện giao thông
    "bicycle", "car", "motorcycle", "airplane",
    "bus", "train", "truck", "boat",

    # Động vật
    "person", "bird", "cat", "dog",
    "horse", "sheep", "cow", "elephant",
    "bear", "zebra", "giraffe",

    # Đồ dùng cá nhân
    "backpack", "umbrella", "handbag",
    "suitcase", "book",

    # Đồ dùng thể thao
    "kite", "skis", "snowboard", "sports ball",
    "baseball bat", "baseball glove", "skateboard",
    "surfboard", "tennis racket",

    # Dụng cụ ăn uống
    "bottle", "wine glass", "cup", "fork",
    "knife", "spoon", "bowl",

    # Hoa quả
    "banana", "apple", "sandwich", "orange",
    "broccoli", "carrot", "hot dog", "pizza",
    "donut", "cake",

    # Nội thất
    "chair", "couch", "potted plant", "bed",
    "dining table", "toilet", "clock", "vase",

    # Thiết bị điện tử
    "tv", "laptop", "mouse", "remote",
    "keyboard", "cell phone", "microwave",
    "oven", "toaster", "refrigerator"
]


In [None]:
class ObjectTextualEncoder:

    def __init__(self, row_str=None, col_str=None, object_classes=None):
        self.row_str = row_str or ['0', '1', '2', '3', '4', '5', '6']
        self.col_str = col_str or ['a', 'b', 'c', 'd', 'e', 'f', 'g']
        self.object_classes = object_classes or default_object_classes
        
        self.x_pts = np.linspace(0, 1, len(self.row_str) + 1)
        self.y_pts = np.linspace(0, 1, len(self.col_str) + 1)
        
        self.grid_bboxes, self.grid_labels = self.initialize_grid_bboxes()

    def initialize_grid_bboxes(self):
        grid_bboxes, grid_labels = [], []
        for row, row_label in enumerate(self.row_str):
            for col, col_label in enumerate(self.col_str):
                grid_bboxes.append([self.x_pts[col], self.y_pts[row], self.x_pts[col + 1], self.y_pts[row + 1]])
                grid_labels.append(f"{col_label}{row_label}")
        return np.array(grid_bboxes), grid_labels

    def visual_grid_bboxes(self, image):
        grid_image = image.copy() if image is not None else np.zeros((210, 210, 3), dtype=np.uint8)
        h, w = grid_image.shape[:2]

        for bbox, label in zip(self.grid_bboxes, self.grid_labels):
            x_start, y_start, x_end, y_end = (bbox * [w, h, w, h]).astype(int)
            org = (int(x_start + (x_end - x_start) / 2) - 10, int(y_start + (y_end - y_start) / 2))
            cv2.putText(grid_image, label, org, cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
            cv2.rectangle(grid_image, (x_start, y_start), (x_end, y_end), (255, 255, 0), 1)

        plt.imshow(cv2.cvtColor(grid_image, cv2.COLOR_RGB2BGR))
        plt.show()
    
    def scale_bboxes(self, bboxes, h, w):
        scale_matrix = np.array([[w, 0, 0, 0], [0, h, 0, 0], [0, 0, w, 0], [0, 0, 0, h]])
        return bboxes @ scale_matrix

    def textual_encoding_object_bboxes(self, image, bboxes, labels):
        h, w = image.shape[:2]
        scaled_grid_bboxes = self.scale_bboxes(self.grid_bboxes, h, w)
        scaled_bboxes = self.scale_bboxes(bboxes, h, w)

        iou_scores = box_iou(torch.as_tensor(scaled_bboxes), torch.as_tensor(scaled_grid_bboxes)).numpy()
        bboxes_ids, grid_bboxes_ids = np.nonzero(iou_scores)

        return ' '.join(
            sorted([self.grid_labels[grid_bbox_id] + labels[bbox_id].replace(" ", "")
            for bbox_id, grid_bbox_id in zip(bboxes_ids, grid_bboxes_ids)])
        )
    
    def textual_encoding_object_numbers(self, labels):
        unique_labels = sorted(set(labels))
        return ' '.join(
            sorted([label.replace(" ", "") + str(labels.count(label))
            for label in unique_labels])
        )
        
    def textual_encoding_object_classes(self, labels):
        object_classes = []
        unique_labels = sorted(set(labels))
        for unique_label in unique_labels:
            count = labels.count(unique_label)
            object_classes.extend([(unique_label + str(i)).replace(" ", "") for i in range(1, count + 1)])
        return ' '.join(sorted(object_classes))

#### **YOLOv8**

In [None]:
%cd D:/AIC2024/extra

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = YOLO('yolov8x.pt')  # pretrained YOLOv8n model

In [None]:
%cd D:/AIC2024/dataset

#### **Object Bounding Box, Object Number and Object Class**

In [None]:
def label_processing(label):
    return '_'.join(label.split(' '))

def filter_detections(results, object_encoder, model):
    """Lọc các kết quả phát hiện theo object_classes của object_encoder."""
    filtered_data = []
    for result in results:
        bboxes = result.boxes.xyxyn.cpu().numpy().copy()
        label_ids = result.boxes.cls.cpu().numpy().copy()
        labels = [model.names[index] for index in label_ids]

        # Lọc theo object_classes
        filtered_bboxes = [bbox for bbox, label in zip(bboxes, labels) if label in object_encoder.object_classes]
        filtered_labels = [label_processing(label) for label in labels if label in object_encoder.object_classes]

        filtered_data.append((filtered_bboxes, filtered_labels))
    return filtered_data

def encode_metadata(image, bboxes, labels, object_encoder):
    """Mã hóa và trả về metadata cho mỗi keyframe."""
    encoded_class = object_encoder.textual_encoding_object_classes(labels) if len(labels) > 0 else ''
    encoded_bbox = object_encoder.textual_encoding_object_bboxes(image, bboxes, labels) if len(labels) > 0 else ''
    encoded_number = object_encoder.textual_encoding_object_numbers(labels) if len(labels) > 0 else ''
    return {
        'object_bbox': encoded_bbox,
        'object_class': encoded_class,
        'object_number': encoded_number
    }

In [None]:
def write_text_file(text, file_path):
    with open(file_path, 'w') as f:
        f.write(text)

def write_json_file(json_data, file_path):
    with open(file_path, 'w') as f:
        json.dump(json_data, f, ensure_ascii=False, indent=4)

def sorted_by_id(keyframe_paths):
    id_path_keyframes = []
    for keyframe_path in keyframe_paths:
        keyframe_filename = keyframe_path.split('/')[-1]
        keyframe_id = int(keyframe_filename.split('.')[0])
        id_path_keyframes.append((keyframe_id, keyframe_path))
    sorted_id_path_keyframes = sorted(id_path_keyframes, key=lambda id_path: id_path[0])
    return [id_path[1] for id_path in sorted_id_path_keyframes]

In [None]:
# Cấu hình các thông số
save_dir = './metadata/object/features'
os.makedirs(save_dir, exist_ok=True)
batch_size = 4
confidence = 0.6
object_encoder = ObjectTextualEncoder()
# Duyệt qua các video và keyframe
for video_part, video_path_dict in all_keyframe_paths.items():
    full_save_dir = save_dir + '/' + video_part
    os.makedirs(full_save_dir, exist_ok=True)
    
    for video_id in tqdm(video_path_dict.keys(), desc=f'Encoding Part {video_part}'):
        video_metadata = {}
        keyframe_paths = sorted_by_id(
            video_path_dict[video_id] + '/' + keyframe 
            for keyframe in os.listdir(video_path_dict[video_id])
        )

        # Xử lý các batch keyframe
        for i in range(0, len(keyframe_paths), batch_size):
            batch_paths = keyframe_paths[i:i + batch_size]
            results = model(batch_paths, conf=confidence, device=device, verbose=False)

            # Lọc và mã hóa metadata
            filtered_results = filter_detections(results, object_encoder, model)
            for keyframe_path, (bboxes, labels) in zip(batch_paths, filtered_results):
                image = cv2.imread(keyframe_path)
                video_metadata[keyframe_path] = encode_metadata(image, bboxes, labels, object_encoder)

        # Lưu metadata vào file JSON
        write_json_file(video_metadata, os.path.join(full_save_dir, f'{video_id}.json'))