In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Download và import các thư viện
Mn thu gọn cho dễ nhìn

In [2]:
# DOWNLOAD THƯ VIỆN CẦN THIẾT
from IPython.display import clear_output

!git clone https://github.com/TuanThanhDat/TransNet.git
!pip install protobuf==3.20.*
!pip install ffmpeg-python

clear_output()

%cd TransNet

/content/TransNet


In [3]:
# IMPORT THƯ VIỆN CẦN THIẾT
import ffmpeg
import numpy as np
import tensorflow as tf
import glob
import os
import cv2
import csv
import time
from transnet import TransNetParams, TransNet
from transnet_utils import draw_video_with_predictions, scenes_from_predictions
clear_output()

# Định nghĩa lớp
Mn thu gọn cho dễ nhìn

In [4]:
'''
==FORMAT DATABASE==
Database-BTC/ (db_path)
    |__Videos_L01/
    |   |__video/
    |       |__L01_V001.mp4
    |       |__...
    |
    |__...


==FORMAT FOLDER KEYFRAME==
KeyFrames_AIO_Pending/
    |__KeyFrames_L01/
    |    |__L01_V001/
    |        |__000001.jpg
    |        |__...
    |
    |__map-keyframes/ (csv_folder_path)
         |__L01_V001.csv
         |__...
'''
clear_output()

In [5]:
'''
set_quiet(value)

__create_txt(vd_name, des_path, scences)
__create_csv(vd_name, content)
__extract_vd_keyframe(vd_path, des_path)

__get_vd_path(vd_name)
__get_des_path(vd_name)
extract(vd_name_list)

__extract(vd_paths)
__get_vd_name(vd_path)
__get_all_vd_paths(group_id)
extract_group(group_id_list)
'''
clear_output()

In [6]:
class Video2Frames:
    def __init__(
        self,
        vd_db_path='/content/drive/MyDrive/HCMAI23/Database-BTC',
        kf_des_path='/content/drive/MyDrive/KeyFrames_AIO_Pending',
        quiet=False
    ):
        self.params = TransNetParams()
        self.params.CHECKPOINT_PATH = "/content/TransNet/model/transnet_model-F16_L3_S2_D256"
        self.model = TransNet(self.params)

        self.db_path = vd_db_path
        self.kf_path = kf_des_path
        self.csv_folder_path = os.path.join(self.kf_path, 'map-keyframes')

        self.quiet = quiet  # Tùy chọn không xuất thông báo

    def set_quiet(self, value:bool):
        self.quiet = value

    def check_fps(self, group_name_list):
        set_fps = []
        for group_name in group_name_list:
            video_paths = self.__get_all_vd_paths(group_name)
            for path in video_paths:
                vd = cv2.VideoCapture(path)
                set_fps.append(vd.get(cv2.CAP_PROP_FPS))
                vd.release()
                cv2.destroyAllWindows()
        return set(set_fps)

    def __create_txt(self, vd_name, des_path, scenes):
        # des_path(KeyFrames_L01/L01_V001)
        # vd_name(L01_V001)
        with open(f"{des_path}.txt", 'w') as f:
            for sc in scenes:
                f.write(str(sc) + '\n')
        if not self.quiet:
            print(f'=> Write file {vd_name}.txt successfully!!!')

    def __create_csv(self, vd_name, content):
        csv_path = os.path.join(self.csv_folder_path,f'{vd_name}.csv')
        with open(csv_path, 'w', newline='') as csvfile:
            # Tên cột
            field_names = ['n','pts_time','fps','frame_idx']
            writer = csv.DictWriter(csvfile, fieldnames = field_names)
            writer.writerow(dict((fn,fn) for fn in field_names))
            # Thêm nội dung cho từng dòng
            for row in content:
                writer.writerow(row)
        if not self.quiet:
            print(f'=> Write file {vd_name}.csv successfully!!!')

    def __extract_vd_keyframe(self, vd_path, des_path):
        # des_path (KeyFrames_L01/L01_V001)
        if not os.path.exists(des_path):
            os.makedirs(des_path)
        if not self.quiet:
            print(f'Video path: {vd_path}')
            print(f'Frame path: {des_path}')
        # Xuất ma trận numpy của các videos bằng ffmpeg
        video_stream, err = (
                ffmpeg
                .input(vd_path)
                .output('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(self.params.INPUT_WIDTH, self.params.INPUT_HEIGHT))
                .run(capture_stdout=True)
            )
        video = np.frombuffer(video_stream, np.uint8).reshape([-1, self.params.INPUT_HEIGHT, self.params.INPUT_WIDTH, 3])

        # Dự đoán chuyển cảnh sử dụng mô hình Transnet
        predictions = self.model.predict_video(video)

        # Tạo sinh ra danh sách các cảnh từ dự đoán
        scenes = scenes_from_predictions(predictions, threshold=0.1)

        # Tạo file txt
        vd_name = self.__get_vd_name(vd_path)
        self.__create_txt(vd_name, des_path, scenes)

        # Rút trích frame
        cam = cv2.VideoCapture(vd_path)
        currentframe = 0
        index = 0
        n=1
        fps = cam.get(cv2.CAP_PROP_FPS)
        csv_content = []
        while True:
            ret,frame = cam.read()
            if ret:
                currentframe += 1
                # for sc in scenes:
                idx_first = int(scenes[index][0])
                idx_last = int(scenes[index][1])
                idx_mid = int(scenes[index][0] + (scenes[index][1]-scenes[index][0])/2)

                #### First ####
                if currentframe - 1 == idx_first:
                    filename_first = "{}/{:0>6d}.jpg".format(des_path, idx_first)
                    cv2.imwrite(filename_first, frame)
                    csv_content.append({'n':n,'pts_time': (idx_first/fps),'fps': fps,'frame_idx': idx_first})
                    n += 1

                # #### Mid ####
                elif currentframe - 1 == idx_mid:
                    filename_mid = "{}/{:0>6d}.jpg".format(des_path, idx_mid)
                    cv2.imwrite(filename_mid, frame)
                    csv_content.append({'n':n,'pts_time': (idx_mid/fps),'fps': fps,'frame_idx': idx_mid})
                    n += 1

                # #### Last ####
                elif currentframe - 1 == idx_last:
                    filename_last = "{}/{:0>6d}.jpg".format(des_path, idx_last)
                    cv2.imwrite(filename_last, frame)
                    index += 1
                    csv_content.append({'n':n,'pts_time': (idx_last/fps),'fps': fps,'frame_idx': idx_last})
                    n += 1
            else:
                break
        cam.release()
        cv2.destroyAllWindows()
        if not self.quiet:
            print(f'=> Extract frames successfully!!!')
        # Tạo file csv
        self.__create_csv(vd_name, csv_content)

    # Video paths which are in video database
    def __get_vd_path(self, video_name):
        group_id = video_name[:video_name.rfind('_')]
        return os.path.join(self.db_path,f'Videos_{group_id}/video/{video_name}.mp4')
        # return os.path.join(self.db_path,f'Videos_{group_id}/{video_name}.mp4')

    # Destination keyframe folder path
    def __get_des_path(self, video_name):
        group_id = video_name[:video_name.rfind('_')]
        return os.path.join(self.kf_path,f'KeyFrames_{group_id}/{video_name}')

    # HÀM LẤY KEYFRAME THEO TÊN VIDEO
    def extract(self, vd_name_list:list):
        for vd_name in vd_name_list:
            vd_path = self.__get_vd_path(vd_name)
            des_path = self.__get_des_path(vd_name)
            self.__extract_vd_keyframe(vd_path, des_path)
            time.sleep(30)

    def __get_vd_name(self, vd_path):
        return os.path.split(vd_path)[-1].replace('.mp4','')

    def __extract(self, vd_paths):
        vd_names = [self.__get_vd_name(path) for path in vd_paths]
        for vd_name, vd_path in zip(vd_names, vd_paths):
            des_path = self.__get_des_path(vd_name)
            self.__extract_vd_keyframe(vd_path, des_path)

    def __get_all_vd_paths(self, group_id):
        return sorted(glob.glob(f'{self.db_path}/Videos_{group_id}/video/*.mp4'))
        # return sorted(glob.glob(f'{self.db_path}/Videos_{group_id}/*.mp4'))

    # HÀM RÚT TRÍCH KEYFRAME THEO ID GROUP VIDEO
    def extract_group(self, group_id_list:list):
        for group_id in group_id_list:
            if not self.quiet:
                print(f'==Group video: {group_id}==')
            vd_paths = self.__get_all_vd_paths(group_id)
            self.__extract(vd_paths)


    def __get_frame_idx(self, img_path):
        return int(os.path.split(img_path)[-1].replace('.jpg',''))


    # HÀM TẠO FILE CSV TỪ CÁC KEYFRAME CÓ SẴN
    def create_video_csv(self, vd_name_list:list, fps=25):
        for vd_name in vd_name_list:
            # Lấy đường dẫn chứa keyframe video
            kf_folder_path = self.__get_des_path(vd_name)
            # Kiểm tra thư mục đã được tạo hay chưa
            if not os.path.exists(kf_folder_path):
                raise Exception(f"The keyframe folder {vd_name} not found!!!")
            # Kiểm tra keyframe đã được lấy chưa
            img_paths = sorted(glob.glob(f'{kf_folder_path}/*.jpg'))
            n_imgs = len(img_paths)
            if n_imgs == 0:
                raise Exception('Frames not found!!!')
            # Thực hiện lấy thông tin từng dòng cho csv
            idxs = map(self.__get_frame_idx, img_paths)
            csv_content = []
            n = 1
            for idx in idxs:
                csv_content.append({'n':n,'pts_time': (idx/fps),'fps': fps,'frame_idx': idx})
                n += 1
            # Viết file csv
            self.__create_csv(vd_name, csv_content)

    def __get_group_path(self, group_name):
        return f'{self.kf_path}/KeyFrames_{group_name}'

    def __get_group_video_names(self, group_path):
        paths = sorted(glob.glob(f'{group_path}/*'))
        txt_paths = sorted(glob.glob(f'{group_path}/*.txt'))
        paths = filter(lambda i: i not in txt_paths, paths)
        return [os.path.split(path)[-1] for path in paths]

    # TẠO FILE CSV THEO TÊN GROUP
    def create_group_csv(self, group_name_list:list):
        for group_name in group_name_list:
            group_path = self.__get_group_path(group_name)
            # Kiểm tra thư mục group keyframe có tồn tại hay ko
            if not os.path.exists(group_path):
                raise Exception(f"Not found folder KeyFrames_{group_name}!!!")
            # Hiển thị thông báo
            if not self.quiet:
                print(f'==Group video: {group_name}==')
            # Lấy tên các thư mục video trong keyframe
            vd_name_list = self.__get_group_video_names(group_path)
            self.create_video_csv(vd_name_list)

    # KIỂM TRA SỐ LƯỢNG CSV VỚI FRAME RÚT ĐƯỢC
    # Một số trường hợp thiếu frame dù trong file csv có ghi lại



# Hướng dẫn

In [7]:
# videos = ['L01_V001', 'L02_V001']
# groups = ['L01', 'L02']

# vd_db_path='/content/drive/MyDrive/HCMAI23/Database-BTC'
# kf_des_path='/content/drive/MyDrive/KeyFrames_AIO_Pending'

# # Khởi tạo đối tượng
# v2f = Video2Frames(vd_db_path=vd_db_path, kf_des_path=kf_des_path)

# # Trích keyframe theo tên video
# v2f.extract(videos)

# # Trích keyframe theo group video
# v2f.extract_group(groups)

# # Tạo file csv cho keyframe đã có trước đó
# v2f.create_video_csv(videos)

# # Tạo file csv theo groups đã trích keyframe trước đó
# v2f.create_group_csv(groups)

In [8]:
# # KIỂM TRA FPS
# %%time
# groups = ['L01','L02','L03','L04','L05','L06','L07','L08','L09','L10',]
# set_fps = v2f.check_fps(groups)
# print(set_fps)

# Chạy lớp rút trích khung hình video
Mn cần làm 2 bước
- Nhập đường dẫn thư mục Database-BTC và KeyFrames_AIO_Pending.
- Nhập tên video xử lý.

In [9]:
# === NHẬP ĐƯỜNG DẪN Ở ĐÂY ===============================
# Ví dụ: vd_db_path = '.../Database-BTC'
# Ví dụ: kf_des_path = '.../KeyFrames_AIO_Pending'
vd_db_path='/content/drive/MyDrive/HCMAI23/Database-BTC'
kf_des_path='/content/drive/MyDrive/KeyFrames_AIO_Pending'
# ========================================================

v2f = Video2Frames(vd_db_path, kf_des_path)
clear_output()
# Mn không nên chạy cell này 2 lần do đang có lỗi.

In [None]:
%%time
# === NHẬP TÊN VIDEO Ở ĐÂY ====
# Ví dụ 'L01_V001'
# 'L12_V001','L12_V002','L12_V003','L12_V004','L12_V005'
# 'L12_V006','L12_V007','L12_V008','L12_V009','L12_V010'
# 'L12_V011','L12_V012','L12_V013','L12_V014','L12_V015'
# 'L12_V016','L12_V017','L12_V018','L12_V019','L12_V020'
# 'L12_V021','L12_V022','L12_V023','L12_V024','L12_V025'
# 'L12_V026','L12_V027','L12_V028','L12_V029','L12_V030'

# '{}_V00{}'.format('L12',i) for i in range(1,6)
# '{}_V00{}'.format('L12',i) for i in range(6,10)
# '{}_V0{}'.format('L12',i) for i in range(10,21)
# '{}_V0{}'.format('L12',i) for i in range(21,)

# videos = ['{}_V0{}'.format('L13',i) for i in range(26,31)]
# # =============================

# v2f.extract(videos)

In [11]:
%%time
# CHẠY THEO GROUP
# groups = ['L16']
# v2f.extract_group(groups)

CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 7.39 µs
