In [None]:
!pip install -U moviepy==1.0.3

In [8]:
from pathlib import Path
from tqdm import tqdm
import os
from moviepy.editor import VideoFileClip
import polars as pl
from tabulate import tabulate
import json

def format_time_in_japanese(total_seconds: float) -> str:
    units = [
        ("年", 365 * 24 * 3600),
        ("日", 24 * 3600),
        ("時間", 3600),
        ("分", 60),
    ]
    parts = []

    # 各単位について、整数で取り出す
    for label, length in units:
        amount = int(total_seconds // length)
        if amount > 0:
            parts.append(f"{amount}{label}")
        total_seconds -= amount * length

    # 残りを「秒」として float で扱う
    if total_seconds > 0:
        parts.append(f"{total_seconds:.04f}秒")

    return "".join(parts) if parts else "0秒"

def get_video_info(video_path: Path) -> dict:

    try:
        # ファイルサイズ（バイト）
        file_size = os.path.getsize(video_path)

        # 再生時間（秒）
        with VideoFileClip(str(video_path)) as video:
            duration_seconds = video.duration

        return {
            "file_path": video_path,
            "file_name": video_path.stem,
            "ext": video_path.suffix,
            "file_size": file_size,
            "duration_seconds": duration_seconds
        }
    except Exception as e:
        print(f"Error: {video_path} {e}")
        return None
    
    return None

def find_duplicates_grouped(df: pl.DataFrame, file_size_key="file_size", duration_seconds_key="duration_seconds") -> list[list[dict]]:
    """
    file_size と duration_seconds の組み合わせが重複している行を、
    グループ単位で辞書のリストとしてまとめ、
    そのグループを要素とするリストを返す。
    
    例：
      - ファイル1・2が重複、ファイル3・4が重複だった場合
        -> [
             [{ファイル1の情報}, {ファイル2の情報}],
             [{ファイル3の情報}, {ファイル4の情報}]
           ]
    """
    # (1) 重複している行だけ抽出
    df_dup = (
        df
        .with_columns(
            # file_size, duration_seconds 単位でカウント
            pl.len().over([file_size_key, duration_seconds_key]).alias("dup_count")
        )
        .filter(pl.col("dup_count") > 1)  # カウントが2以上→重複行
        .drop("dup_count")
    )

    # (2) Pythonループで (file_size, duration_seconds) ごとにまとめる
    grouped_lists = []
    for _, grp_df in df_dup.group_by([file_size_key, duration_seconds_key], maintain_order=True):
        # グループの行数が2行以上あれば重複とみなし、辞書リスト化して追加
        if grp_df.height > 1:
            grouped_lists.append(grp_df.to_dicts())

    return grouped_lists


target_dirs = [
    Path("../../sample_data/"),
]

# 主要な動画拡張子のリスト
video_ext_list = [".mp4", ".avi", ".wmv", ".flv", ".mov", ".avi", ".mkv"]

# 対象ディレクトリ内の動画ファイルをリスト化
target_files = []
for target_dir in target_dirs:
    target_files += list(target_dir.glob("**/*"))   
target_files = [f for f in target_files if f.suffix in video_ext_list]
print(target_files)

# 動画ファイル名が「.」で始まるものを除外
target_files = [f for f in target_files if not f.name.startswith(".")]

records = []
for video_path in tqdm(target_files):
    video_info = get_video_info(video_path)
    if video_info is not None:
        records.append(video_info)

# PolarsのDataFrameに変換
df = pl.DataFrame(records)

if len(df) > 0:

    # 重複項目の抽出
    df_duplicates = find_duplicates_grouped(df)
    print("\n重複しているファイル情報 (file_size, duration_seconds が同じ):")

    for i in range(len(df_duplicates)):
        print(i, " : ", [file["file_name"] for file in df_duplicates[i]])

    df_duplicates = {"df_duplicates" : df_duplicates}

    for duplicate_list in df_duplicates["df_duplicates"]:
        for data in duplicate_list:
            data["file_path"] = str(data["file_path"])


    # # JSONファイルに保存
    # with open("output.json", "w", encoding="utf-8") as file:
    #     json.dump(df_duplicates, file, indent=4, ensure_ascii=False)

[PosixPath('../../sample_data/sample_mp4.mp4'), PosixPath('../../sample_data/sample_mp4 copy.mp4'), PosixPath('../../sample_data/sample2_mp4.mp4')]


100%|██████████| 3/3 [00:00<00:00,  7.75it/s]


重複しているファイル情報 (file_size, duration_seconds が同じ):
0  :  ['sample_mp4', 'sample_mp4 copy']



