In [None]:
from pathlib import Path
import pandas as pd
import os 
import json
pd.options.mode.chained_assignment = None
from deepfake_detection.defaults import DRIVE_PATH

In [None]:
KODF_PATH = DRIVE_PATH / "kodf"
DFDC_PATH = DRIVE_PATH / "dfdc"

In [None]:
def get_bad_paths(drive_path: Path, file_paths: list[str]) -> list[str]:
    bad_paths = []
    for path in file_paths:
        if not os.path.exists(drive_path / path):
            print(f"Bad path: {drive_path / path}")
            bad_paths.append(path)
    
    return bad_paths

# Pre-Proc KODF

In [None]:
from deepfake_detection.defaults import RANDOM_STATE
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split

def get_kodf_train_val_people(unique_persons: list[str], train_size: float = 0.8) -> tuple[list[str], list[str]]:
    shuffled_people = shuffle(unique_persons, random_state=RANDOM_STATE)
    train_people, val_people = train_test_split(shuffled_people, train_size=train_size)

    return train_people, val_people

kodf_synthesized_metadata = pd.read_csv(KODF_PATH / "synthesized_video_metadata.csv")
kodf_synthesized_videos_to_keep = kodf_synthesized_metadata[~kodf_synthesized_metadata.model.isin(["fo", "audio-driven"])]
kodf_synthesized_videos_to_keep['video_path'] = KODF_PATH.relative_to(DRIVE_PATH) / "synthesized_videos" / kodf_synthesized_videos_to_keep['model'] / kodf_synthesized_videos_to_keep['date_time'].str.replace("-", "") / kodf_synthesized_videos_to_keep['target_person_id'] / kodf_synthesized_videos_to_keep['video']
kodf_synthesized_videos_to_keep['video_path'] = kodf_synthesized_videos_to_keep['video_path'].apply(lambda x: str(x))
kodf_synthesized_videos_to_keep['label'] = "FAKE"
kodf_synthesized_videos_to_keep.drop(["sex", "date_time", "model", "target_person_id", "target_video"], axis=1, inplace=True)
kodf_synthesized_videos_to_keep.rename(columns={"source_person_id": "person_id"}, inplace=True)
kodf_synthesized_videos_to_keep.set_index("video", inplace=True)

kodf_original_metadata = pd.read_csv(KODF_PATH / "original_video_metadata.csv")
kodf_original_metadata['video_path'] = KODF_PATH.relative_to(DRIVE_PATH) / "original_videos" / kodf_original_metadata["person_id"] / kodf_original_metadata["video"]
kodf_original_metadata['video_path'] = kodf_original_metadata['video_path'].apply(lambda x: str(x))
kodf_original_metadata['label'] = "REAL"
kodf_original_metadata.drop(["sex", "date_time", "studio"], axis=1, inplace=True)
kodf_original_metadata.set_index("video", inplace=True)

kodf_combined = pd.concat([kodf_synthesized_videos_to_keep, kodf_original_metadata])

train_people, val_people = get_kodf_train_val_people(kodf_combined['person_id'].unique(), train_size=0.6)
val_people, test_people = get_kodf_train_val_people(val_people, train_size=0.5)

kodf_combined.loc[kodf_combined['person_id'].isin(train_people), "split"] = "train"
kodf_combined.loc[kodf_combined['person_id'].isin(val_people), "split"] = "val"
kodf_combined.loc[kodf_combined['person_id'].isin(test_people), "split"] = "test"

kodf_combined['dataset'] = "kodf"
kodf_combined.drop(["person_id"], axis=1, inplace=True)

bad_paths = get_bad_paths(DRIVE_PATH, kodf_combined['video_path'])
    
kodf_combined = kodf_combined[~kodf_combined['video_path'].isin(bad_paths)]

kodf_combined_dict = kodf_combined.to_dict(orient="index")

with open(KODF_PATH / "kodf_val_train_metadata.json", 'w') as f:
    json.dump(kodf_combined_dict, f)

kodf_combined['split'].value_counts()

# Pre-Proc DFDC

In [None]:
# Uncomment for processing 

parts = [i for i in range(50)]
train_metadata = {}
for part in parts:
    print(f"Processing Part: {part}")
    folder_name = f"dfdc_train_part_{str(part).zfill(2)}"
    inner_folder_name = f"dfdc_train_part_{part}"
    current_part_path = DFDC_PATH / "train" / folder_name / inner_folder_name
    metadata_path = current_part_path / "metadata.json" 
    part_meta = {}
    with open(metadata_path) as f:
        meta_data = json.load(f)
    for video, data in meta_data.items():
        video_path = current_part_path / video
        if not os.path.exists(video_path) == True:
            print("CURRENT VIDEO PATH", video_path, video, data)
            continue
        if 'original' in data.keys():
            del data['original']

        data['video_path'] = str(video_path.relative_to(DRIVE_PATH))
        data['dataset'] = "dfdc"  
        part_meta[video] = data
    train_metadata.update(part_meta)

with open(DFDC_PATH / "train_metadata.json", 'w') as f:
    json.dump(train_metadata, f)

for split in ["val", "test"]:
    folder_path = DFDC_PATH / split
    dfdc_split_df = pd.read_csv(folder_path / "labels.csv")
    dfdc_split_df.loc[dfdc_split_df['label'] == 1, "label"] = "FAKE"
    dfdc_split_df.loc[dfdc_split_df['label'] == 0, "label"] = "REAL"
    dfdc_split_df.loc[:, "split"] = split
    dfdc_split_df.loc[:, "dataset"] = "dfdc"
    dfdc_split_df.rename(columns={"filename": "video"}, inplace=True)
    dfdc_split_df.loc[:, "video_path"] = folder_path.relative_to(DRIVE_PATH) / dfdc_split_df['video']
    dfdc_split_df['video_path'] = dfdc_split_df['video_path'].apply(lambda x: str(x))

    bad_paths = get_bad_paths(DRIVE_PATH, dfdc_split_df['video_path'])

    dfdc_split_df = dfdc_split_df[~dfdc_split_df['video_path'].isin(bad_paths)]
    dfdc_split_df.set_index("video", inplace=True)

    dfdc_split_dict = dfdc_split_df.to_dict(orient="index")

    with open(DFDC_PATH / f"{split}_metadata.json", 'w') as f:
        json.dump(dfdc_split_dict, f)

In [None]:
kodf_df = pd.read_json(KODF_PATH / "kodf_val_train_metadata.json", orient="index")
dfdc_train_df = pd.read_json(DFDC_PATH / "train_metadata.json", orient="index")
dfdc_val_df = pd.read_json(DFDC_PATH / "val_metadata.json", orient="index")
dfdc_test_df = pd.read_json(DFDC_PATH / "test_metadata.json", orient="index")

In [None]:
final_dataset = pd.concat([kodf_df, dfdc_train_df, dfdc_val_df, dfdc_test_df])
final_dataset.reset_index(inplace=True)
final_dataset.columns = ["video", "video_path", "label", "split", "dataset"]
final_dataset.to_csv(DRIVE_PATH / "metadata.csv", index=False)