In [1]:
import subprocess
import os
from pathlib import Path
from PIL import Image
import struct
import shutil
import sys
from tqdm.notebook import tqdm  # Import tqdm.notebook for progress bars in Jupyter


def extract_frames(ffmpeg_path, video_path, temp_frame_dir, desired_fps=6, output_resolution=(480, 320)):
    """
    Extract frames from a video using FFmpeg, scaling and cropping them.

    Parameters:
        ffmpeg_path (str): Path to the FFmpeg executable.
        video_path (str): Path to the input video file.
        temp_frame_dir (str): Directory to store temporary extracted frames.
        desired_fps (int): Desired frames per second for extraction.
        output_resolution (tuple): Desired output resolution as (width, height).

    Returns:
        list of Path: List of extracted frame file paths.
    """
    temp_dir = Path(temp_frame_dir)
    temp_dir.mkdir(parents=True, exist_ok=True)

    frame_pattern = temp_dir / "frame_%05d.png"

    # Define scaling and cropping filters with centering
    scale_filter = f"scale={output_resolution[0]}:{output_resolution[1]}:force_original_aspect_ratio=increase"
    crop_filter = f"crop={output_resolution[0]}:{output_resolution[1]}:(in_w-{output_resolution[0]})/2:(in_h-{output_resolution[1]})/2"
    vf_filter = f"{scale_filter},{crop_filter}"

    ffmpeg_extract_cmd = [ffmpeg_path, "-i", video_path, "-vf", vf_filter, "-r", str(desired_fps), "-q:v", "2", str(frame_pattern)]  # Set frame rate to desired_fps and quality for PNG extraction

    try:
        subprocess.run(ffmpeg_extract_cmd, check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
        print(f"Extracted frames from '{video_path}' to '{temp_frame_dir}'")
    except subprocess.CalledProcessError as e:
        print(f"FFmpeg extraction error for '{video_path}': {e.stderr.decode().strip()}")
        return []

    # Gather frame paths
    frame_files = sorted(temp_dir.glob("frame_*.png"))
    if not frame_files:
        print(f"No frames extracted for '{video_path}'.")
    return frame_files


def convert_frame_to_rgb565_array(frame_path):
    """
    Convert a single image frame to a flat list of RGB565 pixel values.

    Parameters:
        frame_path (Path): Path to the image frame.

    Returns:
        list of int: Flat list of RGB565 pixel values.
        int: Width of the frame.
        int: Height of the frame.
    """
    try:
        with Image.open(frame_path) as img:
            img = img.convert("RGB")  # Ensure image is in RGB format
            width, height = img.size
            rgb_bytes = img.tobytes()

            rgb565_array = []
            idx = 0
            for _ in range(height):
                for _ in range(width):
                    r = rgb_bytes[idx]
                    g = rgb_bytes[idx + 1]
                    b = rgb_bytes[idx + 2]
                    idx += 3

                    # Convert 8-bit to 5/6 bits
                    r5 = (r >> 3) & 0x1F
                    g6 = (g >> 2) & 0x3F
                    b5 = (b >> 3) & 0x1F

                    # Pack into 16 bits
                    rgb565_val = (r5 << 11) | (g6 << 5) | b5
                    rgb565_array.append(rgb565_val)
            return rgb565_array, width, height
    except Exception as e:
        print(f"Error converting frame '{frame_path}': {e}")
        return [], 0, 0


def create_rgb565ani_binary(output_binary_path, frame_paths, frame_durations_ms, width, height):
    """
    Combine all frames into a single binary file with a header, storing only changed pixels (delta frames).

    Parameters:
        output_binary_path (str): Path to the output binary file.
        frame_paths (list of Path): List of frame file paths.
        frame_durations_ms (list of float): List of frame durations in milliseconds.
        width (int): Width of the frames.
        height (int): Height of the frames.
    """
    try:
        with open(output_binary_path, "wb") as f:
            # Write header
            magic = b"RGB565ANI"  # 9 bytes magic number
            f.write(magic)
            f.write(struct.pack("<I", len(frame_paths)))  # Frame count
            f.write(struct.pack("<H", width))  # Width
            f.write(struct.pack("<H", height))  # Height

            prev_frame = None
            for idx, (frame_path, duration_ms) in enumerate(zip(frame_paths, frame_durations_ms), 1):
                curr_frame, frame_width, frame_height = convert_frame_to_rgb565_array(frame_path)
                if not curr_frame:
                    print(f"Skipping frame {idx} due to conversion error.")
                    continue

                if prev_frame is None:
                    # First frame, write full frame
                    f.write(struct.pack("<I", duration_ms))  # Duration in ms
                    # Write full frame pixel data
                    frame_data = bytearray()
                    for pixel in curr_frame:
                        frame_data += struct.pack("<H", pixel)  # Little endian
                    f.write(frame_data)
                else:
                    f.write(struct.pack("<I", duration_ms))  # Duration in ms
                    frame_data = bytearray()
                    for pixel in curr_frame:
                        frame_data += struct.pack("<H", pixel)  # Little endian
                    f.write(frame_data)

                # Update previous frame
                prev_frame = curr_frame

            print(f"Successfully created RGB565ANI binary file at '{output_binary_path}'")
    except Exception as e:
        print(f"Error creating binary file '{output_binary_path}': {e}")


def process_video_to_rgb565(ffmpeg_path, video_path, output_binary_path, temp_frame_dir, desired_fps=6, output_resolution=(480, 320)):
    """
    Process an MP4 video and convert it into a single RGB565ANI binary file with delta frames.

    Parameters:
        ffmpeg_path (str): Path to the FFmpeg executable.
        video_path (str): Path to the input video file.
        output_binary_path (str): Path to the output RGB565ANI binary file.
        temp_frame_dir (str): Directory to store temporary extracted frames.
        desired_fps (int): Desired frames per second for extraction.
        output_resolution (tuple): Desired output resolution as (width, height).
    """
    print(f"\nProcessing video '{video_path}'...")
    frame_paths = extract_frames(ffmpeg_path, video_path, temp_frame_dir, desired_fps=desired_fps, output_resolution=output_resolution)
    if not frame_paths:
        print(f"No frames extracted for '{video_path}'. Skipping.")
        return

    # Assign default durations based on desired_fps
    default_duration_ms = int(1000 / desired_fps)
    frame_durations_ms = [default_duration_ms] * len(frame_paths)

    # Retrieve frame dimensions from the first frame
    try:
        with Image.open(frame_paths[0]) as img:
            img = img.convert("RGB")
            width, height = img.size
            if (width, height) != output_resolution:
                print(f"Warning: Frame dimensions {width}x{height} do not match desired resolution {output_resolution}.")
    except Exception as e:
        print(f"Error reading frame '{frame_paths[0]}': {e}")
        return

    create_rgb565ani_binary(output_binary_path, frame_paths, frame_durations_ms, width, height)


def process_all_videos(ffmpeg_path, input_directory, output_directory, output_resolution=(480, 320), desired_fps=6):
    """
    Process all .mp4 video files in the specified input directory:
    - Extract frames and assign durations.
    - Convert frames to RGB565 with delta frames.
    - Combine into a single RGB565ANI binary file.
    - Save the binary file to the specified output directory.

    Parameters:
        ffmpeg_path (str): Path to the FFmpeg executable.
        input_directory (str): Path to the directory containing .mp4 videos.
        output_directory (str): Path to the directory where the output binary files will be saved.
        output_resolution (tuple): Desired output resolution as (width, height).
        desired_fps (int): Desired frames per second for extraction.
    """
    input_dir = Path(input_directory)
    if not input_dir.is_dir():
        print(f"The specified input directory '{input_directory}' is not a directory or does not exist.")
        return

    # Define supported video extensions
    supported_extensions = [".mp4"]

    # Gather all .mp4 videos in the input directory (non-recursive)
    video_files = [file for file in input_dir.iterdir() if file.suffix.lower() in supported_extensions and file.is_file()]

    if not video_files:
        print(f"No .mp4 videos found in directory '{input_directory}'.")
        return

    print(f"Found {len(video_files)} .mp4 video(s) to process.")

    # Initialize the progress bar using tqdm.notebook
    for video in tqdm(video_files, desc="Processing Videos", unit="video"):
        video_path = str(video)
        output_filename = video.stem + ".rgb565ani"
        output_binary_path = Path(output_directory) / output_filename

        # Define temporary frame directory for this video
        temp_frame_dir = Path(output_directory) / (video.stem + "_temp_frames")
        temp_frame_dir.mkdir(parents=True, exist_ok=True)

        # Process the video
        process_video_to_rgb565(ffmpeg_path=ffmpeg_path, video_path=video_path, output_binary_path=str(output_binary_path), temp_frame_dir=str(temp_frame_dir), desired_fps=desired_fps, output_resolution=output_resolution)

        # Cleanup temporary frames
        try:
            shutil.rmtree(temp_frame_dir)
            print(f"Cleaned up temporary frames for '{video_path}'.")
        except Exception as e:
            print(f"Error cleaning up temporary frames for '{video_path}': {e}")


def main():
    """
    Main function to define directories and process all videos.
    """
    # Define path for FFmpeg executable
    ffmpeg_path = r"C:\ffmpeg-7.1-essentials_build\bin\ffmpeg.exe"  # Path to FFmpeg executable

    # Define input and output directories
    input_directory = r"G:\My Drive\CogVideoX"  # Directory containing .mp4 videos
    output_directory = r"C:\CogVideoX-rgb565ani"  # Directory to save the output binary files

    # Define desired output resolution (width x height)
    output_resolution = (480, 320)

    # Define desired frames per second for extraction
    desired_fps = 6  # As per user's mp4 fps

    # Verify that FFmpeg exists
    if not Path(ffmpeg_path).is_file():
        print(f"FFmpeg executable not found at '{ffmpeg_path}'. Please check the path.")
        sys.exit(1)

    # Ensure output directory exists
    Path(output_directory).mkdir(parents=True, exist_ok=True)

    # Process all videos
    process_all_videos(ffmpeg_path=ffmpeg_path, input_directory=input_directory, output_directory=output_directory, output_resolution=output_resolution, desired_fps=desired_fps)


if __name__ == "__main__":
    main()

Found 318 .mp4 video(s) to process.


Processing Videos:   0%|          | 0/318 [00:00<?, ?video/s]


Processing video 'G:\My Drive\CogVideoX\IMG-20230806-WA0024_seed4316937476279866400.mp4'...
Extracted frames from 'G:\My Drive\CogVideoX\IMG-20230806-WA0024_seed4316937476279866400.mp4' to 'C:\CogVideoX-rgb565ani\IMG-20230806-WA0024_seed4316937476279866400_temp_frames'
Successfully created RGB565ANI binary file at 'C:\CogVideoX-rgb565ani\IMG-20230806-WA0024_seed4316937476279866400.rgb565ani'
Cleaned up temporary frames for 'G:\My Drive\CogVideoX\IMG-20230806-WA0024_seed4316937476279866400.mp4'.

Processing video 'G:\My Drive\CogVideoX\IMG-20230720-WA0002_seed1041404606658148093.mp4'...
Extracted frames from 'G:\My Drive\CogVideoX\IMG-20230720-WA0002_seed1041404606658148093.mp4' to 'C:\CogVideoX-rgb565ani\IMG-20230720-WA0002_seed1041404606658148093_temp_frames'
Successfully created RGB565ANI binary file at 'C:\CogVideoX-rgb565ani\IMG-20230720-WA0002_seed1041404606658148093.rgb565ani'
Cleaned up temporary frames for 'G:\My Drive\CogVideoX\IMG-20230720-WA0002_seed1041404606658148093.mp4'