<a href="https://colab.research.google.com/github/DEX-1101/Video-Frame-Extractor/blob/main/Frame_Extractor_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title Preparing Video
from google.colab import drive
# @markdown Use `Google Drive` as Gallery to manage the image.
# @markdown - If error appear about importing `libtorrent`, just run the this cell agian.
# @markdown - Use [Nyaa.si](https://nyaa.si/) or similar website to get video that you needed without subtitle.
# @markdown - Download it using `Torrent/Magnet link` provided below.
mount_drive = True # @param {"type":"boolean"}

if mount_drive:
   print("Mounting drive...")
   drive.mount("/content/drive")

print("installing requirements...")
!apt-get update > /dev/null 2>&1
!apt-get install -y ffmpeg
!pip install -q ultralytics opencv-python requests imagehash pillow
!pip install -q lbry-libtorrent

import libtorrent as lt

ses = lt.session()
ses.listen_on(6881, 6891)
downloads = []

print("Done")

In [None]:
# @title Add From Torrent File
from google.colab import files

source = files.upload()
params = {
    "save_path": "/content/drive/My Drive/Torrent",
    "ti": lt.torrent_info(list(source.keys())[0]),
}
downloads.append(ses.add_torrent(params))

In [None]:
# @title Add From Magnet Link
# @markdown - You can put multiple link, after it type `done` to finish.
params = {"save_path": "/content/drive/My Drive/Torrent"}

while True:
    magnet_link = input("Enter Magnet Link: ")
    if magnet_link.lower() == "done":
        break
    downloads.append(
        lt.add_magnet_uri(ses, magnet_link, params)
    )


In [None]:
# @title Start Download
# @markdown Downloaded file will be located in : `/content/drive/My Drive/Torrent`
import time
from IPython.display import display
import ipywidgets as widgets

state_str = [
    "queued",
    "checking",
    "downloading metadata",
    "downloading",
    "finished",
    "seeding",
    "allocating",
    "checking fastresume",
]

layout = widgets.Layout(width="auto")
style = {"description_width": "initial"}
download_bars = [
    widgets.FloatSlider(
        step=0.01, disabled=True, layout=layout, style=style
    )
    for _ in downloads
]
display(*download_bars)

while downloads:
    next_shift = 0
    for index, download in enumerate(downloads[:]):
        bar = download_bars[index + next_shift]
        if not download.is_seed():
            s = download.status()

            bar.description = " ".join(
                [
                    download.name(),
                    str(s.download_rate / 1000),
                    "kB/s",
                    state_str[s.state],
                ]
            )
            bar.value = s.progress * 100
        else:
            next_shift -= 1
            ses.remove_torrent(download)
            downloads.remove(download)
            bar.close() # Seems to be not working in Colab (see https://github.com/googlecolab/colabtools/issues/726#issue-486731758)
            download_bars.remove(bar)
            print(download.name(), "complete")
    time.sleep(1)


In [None]:
# @markdown Clear torrent/magnet download list.
ses.remove_torrent(download)
downloads.remove(download)

In [None]:
# @title Extract Frame from Video
import cv2
from ultralytics import YOLO
import os
import requests
import imagehash
from PIL import Image
import numpy as np

# @markdown  Enter the full path of the video. `OUTPUT_DIR` will created automatically if doens't exist.
VIDEO_PATH = "/content/drive/My Drive/Torrent/[Erai-raws] Izure Saikyou no Renkinjutsushi - 06 [1080p CR WEBRip HEVC EAC3][MultiSub][E89BDE07].mkv" # @param {"type":"string"}
OUTPUT_DIR = "/content/drive/MyDrive/extracted_frames/eps6"       # @param {"type":"string"}
DEVICE = "cuda" # @param ["cuda","cpu"]
# @markdown Detection Model used for extraction so it's only detect the character.
MODEL_URL = "https://github.com/Fuyucch1/yolov8_animeface/releases/download/v1/yolov8x6_animeface.pt" # @param {"type":"string"}
MODEL_FILE = "yolov8x6_animeface.pt"  # @param {"type":"string"}
# @markdown Adjust detection threshold. More number are more strong detection.
CONF_THRESHOLD = 0.7 # @param {"type":"slider","min":0,"max":1,"step":0.05}
# @markdown Skip every N seconds of the video duration. Increase it to `2` or `3` to make the processing more faster.
SKIP_INTERVAL = 1 # @param {"type":"raw"}
# @markdown Remove duplicate image to reduce processing time.
# @markdown - `0` threshold means only exact duplicates are skipped.
# @markdown - `10` threshold allows minor differences (e.g., small changes in brightness or compression artifacts).
SIMILARITY_THRESHOLD = 10 # @param {"type":"slider","min":0,"max":64,"step":1}

def download_model(url, filename):
    """Download the YOLO model if it doesn't exist"""
    if not os.path.exists(filename):
        print(f"Downloading model from {url}...")
        response = requests.get(url, stream=True)
        response.raise_for_status()

        with open(filename, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print("Model downloaded successfully")

def compute_phash(image):
    """Compute perceptual hash (pHash) of an image"""
    pil_image = Image.fromarray(image)
    return imagehash.phash(pil_image)

def is_similar(hash1, hash2, threshold=SIMILARITY_THRESHOLD):
    """Check if two hashes are similar based on a threshold"""
    return hash1 - hash2 <= threshold

def main():
    download_model(MODEL_URL, MODEL_FILE)

    model = YOLO(MODEL_FILE)
    model.to(DEVICE)

    os.makedirs(OUTPUT_DIR, exist_ok=True)

    cap = cv2.VideoCapture(VIDEO_PATH)
    if not cap.isOpened():
        raise FileNotFoundError(f"Could not open video file: {VIDEO_PATH}")

    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = 0
    saved_count = 0
    previous_hashes = []

    skip_frames = int(fps * SKIP_INTERVAL)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count % skip_frames != 0:
            frame_count += 1
            continue

        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        results = model.predict(
            source=frame_rgb,
            conf=CONF_THRESHOLD,
            verbose=False,
            device=DEVICE
        )

        if len(results[0].boxes) == 0:
            frame_count += 1
            continue

        current_hash = compute_phash(frame_rgb)

        is_duplicate = False
        for prev_hash in previous_hashes:
            if is_similar(current_hash, prev_hash):
                is_duplicate = True
                break

        if not is_duplicate:
            output_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:06d}.jpg")
            cv2.imwrite(output_path, frame)
            previous_hashes.append(current_hash)
            saved_count += 1
            print(f"\033[92mSaved frame {frame_count} \033[0m (Confidence: {results[0].boxes[0].conf.item():.2f})")
        else:
            print(f"\033[93mSkipped frame {frame_count} (duplicate)\033[0m")

        frame_count += 1

    cap.release()
    print(f"\n\033[96mProcessing complete!\033[0m")
    print(f"Total frames processed: \033[96m{frame_count}\033[0m")
    print(f"Unique frames with faces saved: \033[96m {saved_count}\033[0m")

if __name__ == "__main__":
    main()

In [None]:
# Clear folder
!rm -rf /content/extracted_frames/*