#1. Kết nối Drive và set up các thư viện

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install ftfy regex tqdm -q
!pip install git+https://github.com/openai/CLIP.git -q

In [None]:
import numpy as np
import cv2
import time
import torch
import clip
import os, glob
from PIL import Image
from tqdm import tqdm
from numpy.linalg import svd
from PIL import Image
# import matplotlib.pyplot as plt
# import torch.nn.functional as F

#2. Clone TransNet và giải nén List Videos

##2.1. Làm việc trong Drive

In [None]:
%cd /content/drive/MyDrive/Full_Folder
# Đảm bảo rằng đã có Videos_L10.zip trong Full_Folder

In [None]:
!git clone https://github.com/soCzech/TransNetV2.git
%cd TransNetV2
!python setup.py install
!pip install ffmpeg-python

In [None]:
# 1. Need to change
!unzip -q /content/drive/MyDrive/Full_Folder/Videos_L10.zip -d ./

##2.2. Làm việc ngoài Drive

In [None]:
%cd /content/
!mkdir Full_Folder
%cd Full_Folder

# 2. Need to change
# https://drive.google.com/file/d/1Vq5FX_w90A5PSfJRDSiZh4bbeVFHk0Xh/view?usp=sharing
!gdown 1Vq5FX_w90A5PSfJRDSiZh4bbeVFHk0Xh

In [None]:
!git clone https://github.com/soCzech/TransNetV2.git
%cd TransNetV2
!python setup.py install
!pip install ffmpeg-python

In [None]:
# 3. Need to change
!unzip -q /content/Full_Folder/Videos_L10.zip -d ./

In [None]:
science = "L10" # 4. Need to change

#3. TransNet inference

In [None]:
from inference.transnetv2 import TransNetV2
model = TransNetV2(model_dir="inference/transnetv2-weights/")

In [None]:
video_dir = "video"
general_save_dir = f"{science}_keyframes"

video_list = sorted(os.listdir(video_dir))


for single_video in video_list:
    start_time = time.time()
    single_video_path = os.path.join(video_dir, single_video)
    video_frames, single_frame_predictions, all_frame_predictions = model.predict_video(single_video_path)
    res_arr = model.predictions_to_scenes(single_frame_predictions)

    full_list = [np.insert(i, 1, int(i.mean())) for i in res_arr]
    full_arr = np.array(full_list)

    saved_dir = f"{general_save_dir}/{single_video[:-4]}"
    os.makedirs(saved_dir, exist_ok=True)

    cap = cv2.VideoCapture(single_video_path)

    if not cap.isOpened():
        print("Can't open the video.")
        exit()

    for idx, segment in enumerate(full_arr):
        cap.set(cv2.CAP_PROP_POS_FRAMES, segment[0])
        _, frame = cap.read()
        saved_path = f"{saved_dir}/{idx:03d}a_{segment[0]:05d}.jpg"
        cv2.imwrite(saved_path, frame)

        cap.set(cv2.CAP_PROP_POS_FRAMES, segment[1])
        _, frame = cap.read()
        saved_path = f"{saved_dir}/{idx:03d}b_{segment[1]:05d}.jpg"
        cv2.imwrite(saved_path, frame)

        cap.set(cv2.CAP_PROP_POS_FRAMES, segment[2])
        _, frame = cap.read()
        saved_path = f"{saved_dir}/{idx:03d}c_{segment[2]:05d}.jpg"
        cv2.imwrite(saved_path, frame)

    cap.release()
    iteration_time = round(time.time() - start_time, 2)
    print(f"Save successfully {saved_dir} in {iteration_time}s")
    print()

In [None]:
# 5. Need to change
!zip -r L10_keyframes.zip L10_keyframes

#4. CLIP inference

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)

In [None]:
start_time = time.time()

src_dir = f"{science}_keyframes"

save_clip_dir = f"clip_features_{science}"
os.makedirs(save_clip_dir, exist_ok=True)

videos = sorted(os.listdir(src_dir))

for video in videos:
    print(video)
    clip_vector_list = []
    video_path = os.path.join(src_dir, video)
    images = sorted(os.listdir(video_path))
    for image in tqdm(images):
        image_path = os.path.join(video_path, image)
        image_input = preprocess(Image.open(image_path)).unsqueeze(0).to(device)

        with torch.no_grad():
            image_features = model.encode_image(image_input).cpu().detach().numpy()
            clip_vector_list.append(image_features)

    clip_vector_arr = np.array(clip_vector_list)
    save_path = f"{save_clip_dir}/{video}.npy"
    np.save(save_path, clip_vector_arr)
    print(f"Save successfully {save_path}")
    print()


print(time.time() - start_time)

In [None]:
# 6. Need to change
!zip -r compressed_L10.zip compressed_L10

#5. TransNet post-processing

In [None]:
def compress_image(img_name, k):
    # Load the image and move it to GPU
    img = np.asarray(Image.open(img_name))
    img_tensor = torch.tensor(img, dtype=torch.float32).cuda()

    # Split the image into RGB channels and move to GPU
    r = img_tensor[:, :, 0]
    g = img_tensor[:, :, 1]
    b = img_tensor[:, :, 2]

    # Perform SVD on each channel using GPU-accelerated torch.svd
    ur, sr, vr = torch.svd(r, some=True)
    ug, sg, vg = torch.svd(g, some=True)
    ub, sb, vb = torch.svd(b, some=True)

    # Compress the channels on GPU
    rr = torch.matmul(ur[:, :k], torch.matmul(torch.diag(sr[:k]), vr[:, :k].t()))
    rg = torch.matmul(ug[:, :k], torch.matmul(torch.diag(sg[:k]), vg[:, :k].t()))
    rb = torch.matmul(ub[:, :k], torch.matmul(torch.diag(sb[:k]), vb[:, :k].t()))

    # Create the compressed image tensor
    rimg = torch.zeros_like(img_tensor)
    rimg[:, :, 0] = rr
    rimg[:, :, 1] = rg
    rimg[:, :, 2] = rb

    # Ensure pixel values are within the valid range
    rimg = torch.clamp(rimg, 0, 255)

    # Convert the compressed image back to a NumPy array
    compressed_image = rimg.cpu().numpy().astype(np.uint8)
    return compressed_image

def resize_img(img, size=(255,144)):
    img = Image.fromarray(img)
    img = img.resize(size)
    return img

In [None]:
# Main

# src_dir = "L10_keyframes"
# videos = sorted(os.listdir(src_dir))

compressed_dir = f'compressed_{science}' # 9. Need to change

for video in videos:
    video_path = os.path.join(src_dir, video)
    for single_image in tqdm(sorted(os.listdir(video_path))):
        compressed_img = compress_image(os.path.join(video_path, single_image), 100)
        img = resize_img(compressed_img)

        saved_video_path = os.path.join(compressed_dir, video)
        os.makedirs(saved_video_path, exist_ok=True)

        img.save(os.path.join(saved_video_path, single_image)[:-4] + '.webp', 'webp')

In [None]:
# 7. Need to change
!zip -r compressed_L10.zip compressed_L10

#6. YOLOv8 inference

In [None]:
!pip install ultralytics

In [None]:
from ultralytics import YOLO
model = YOLO("yolov8n.pt")

In [None]:
# src_dir = "L10_keyframes"
# videos = sorted(os.listdir(src_dir))

saved_detection_dir = f"detection_{science}"

for video in videos:
    second_path = os.path.join(src_dir, video)
    img_list = sorted(os.listdir(second_path))

    for img_name in img_list:
        img_path = os.path.join(second_path, img_name)
        img = cv2.imread(img_path)

        results = model.predict(img)

        saved_folder = os.path.join(saved_detection_dir, video)
        os.makedirs(saved_folder)

        label_path = os.path.join(saved_folder, img_name)[:-4] + ".txt"
        with open(label_path, "w") as f:
            for r in results:
                boxes = r.boxes
                for box in boxes:
                    list_box = (box.xywhn[0]).tolist()
                    name = int((box.cls).item())
                    label = str(name) + " " + str(list_box[0]) + " " + str(list_box[1]) + " " + str(list_box[2]) + " " + str(list_box[3])
                    f.write(label)
                    f.write("\n")

In [None]:
!zip -r detection_L10.zip detection_L10 #8. Need to change