In [None]:
import sys 
#path to append: "./ChimpRec/Code"
sys.path.append(...)

from chimplib.imports import pd, cv2, os, random, np, Image, sys

In [None]:
#path to the folder with CCR videos
videos_path = "C:/Users/julie/OneDrive - UCL/Master_2/Mémoire/ChimpRec/ChimpRec-Dataset/CCR/data/videos"
#path to file with CCR face annotations (face_data.csv)
annotations_path = "C:/Users/julie/OneDrive - UCL/Master_2/Mémoire/ChimpRec/ChimpRec-Dataset/CCR/annotations/face_data.csv"
#path to folder where dataset is to be created
dataset_path = "C:/Users/julie/Documents/Unif/Mémoire/CCR_recognition_dataset"

In [None]:
# @input:
# video_path:  path to the video file
# @output:
# (width, height): width and height of the video in pixels
def get_video_resolution(video_path):
    cap = cv2.VideoCapture(video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    cap.release()
    return width, height

In [None]:
# @input:
# df: pd.DataFrame of the face.csv file of the CCR dataset
# exclude_list: optional list of individual names to exclude
# @output:
# individuals: list of individuals (labels) in the dataset but not in exclude_list
def get_individuals(df, exclude_list=None):
    individuals = list(df['label'].unique())
    for name in exclude_list or []:
        if name in individuals:
            individuals.remove(name)
    return individuals

In [None]:
# @input:
# df: pd.DataFrame of the face.csv file of the CCR dataset
# videos_path: path to the folder containing the videos of the CCR dataset
# @output:
# resolutions: dictionary mapping each video name to its (width, height) resolution
def compute_video_resolutions(df, videos_path):
    resolutions = {}
    for video in df['video'].unique():
        year = 2012 if video in os.listdir(os.path.join(videos_path, "2012")) else 2013
        resolutions[video] = get_video_resolution(os.path.join(videos_path, str(year), video))
    return resolutions

In [None]:
# @input:
# df: pd.DataFrame of the face.csv file of the CCR dataset
# individual: list of individuals (labels) to process
# videos_resolution: dictionary mapping video names to their resolutions (width, height))
# @output:
# dict - split dataframes {'train': df_train, 'val': df_val, 'test': df_test}
#         where each dataframe contains only frames for the given individual,
#         and only if the crop size is >100x100 pixels
def split_and_filter_individual_data(df, individual, videos_resolution):
    videos = list(df[df['label'] == individual]['video'].unique())
    random.shuffle(videos)

    #Divides videos into 70% training set, 15% validation and 15% test
    train_videos = videos[:int(0.7 * len(videos))]
    val_videos = videos[int(0.7 * len(videos)):int(0.85 * len(videos))]
    test_videos = videos[int(0.85 * len(videos)):]

    df_individual = df[df['label'] == individual].copy()
    valid_rows = []
    for _, row in df_individual.iterrows():
        width, height = videos_resolution[row["video"]]
        crop_width = row['w'] * width
        crop_height = row['h'] * height
        #checks that the resolution respects the minimum threshold
        if crop_width > 100 and crop_height > 100:
            valid_rows.append(row)
    df_filtered = pd.DataFrame(valid_rows)

    train_subset = df_filtered[df_filtered['video'].isin(train_videos)]
    val_subset = df_filtered[df_filtered['video'].isin(val_videos)]
    test_subset = df_filtered[df_filtered['video'].isin(test_videos)]

    #If there are not enough good quality frames in one of the sets, we complete the set with frames intended for 
    # the training set and remove these frames from the training set
    if len(val_subset) < 250: 
        extra_val_images = train_subset.sample(n=250-len(val_subset), random_state=42, replace=False)
        val_subset = pd.concat([val_subset, extra_val_images])
        train_subset = train_subset.drop(extra_val_images.index) 

    if len(test_subset) < 250: 
        extra_test_images = train_subset.sample(n=250-len(test_subset), random_state=42, replace=False)
        test_subset = pd.concat([test_subset, extra_test_images])
        train_subset = train_subset.drop(extra_test_images.index) 

    return {
        "train": train_subset.sample(n=1000, random_state=42, replace=False),
        "val": val_subset.sample(n=250, random_state=42, replace=False),
        "test": test_subset.sample(n=250, random_state=42, replace=False)
    }

In [None]:
# @input:
# data: dictionary mapping individual names to his pd.DataFrame
# split: "train", "val", or "test"
# videos_path: path to the folder containing the videos of the CCR dataset
# dataset_path: path to the folder where cropped face images will be saved
# @output:
# None (cropped face images are saved in corresponding directories)
def save_cropped_faces(data, split, videos_path, dataset_path):
    for individual, frames in data.items():
        split_path = os.path.join(dataset_path, split)

        #If you're on a train or validating a set, separate the images into individual folders
        if split in ["train", "val"]:
            individual_path = os.path.join(split_path, individual)
            os.makedirs(individual_path, exist_ok=True)
        else:
            individual_path = split_path

        id  = 0

        for idx, row in frames.iterrows():
            video_path = os.path.join(videos_path, str(row["year"]), row["video"])
            cap = cv2.VideoCapture(video_path)

            if not cap.isOpened():
                print(f"+Erreur ouverture vidéo : {row['video']}")
                continue
            
            #retrieve the frame of interest
            cap.set(cv2.CAP_PROP_POS_FRAMES, row["frame"])
            ret, frame = cap.read()

            if not ret or frame is None:
                print(f"Impossible de lire la frame {row['frame']} de {row['video']}")
                cap.release()
                continue

            cap.release()

            h, w, _ = frame.shape
            x1 = int(row["x"] * w)
            y1 = int(row["y"] * h)
            x2 = x1 + int(row["w"] * w)
            y2 = y1 + int(row["h"] * h)

            # Correction if the bbox exceed the edges of the image
            x1, y1 = max(0, x1), max(0, y1)
            x2, y2 = min(w, x2), min(h, y2)

            face_crop = frame[y1:y2, x1:x2]

            #check if crop is empty
            if face_crop is None or face_crop.size == 0:
                print(f"face_crop vide pour {row['video']} frame {row['frame']}")
                continue

            # converts images to uint8 type, which is the expected standard format for images in libraries such as PIL
            face_crop = face_crop.astype(np.uint8)  

            filename = f"{individual}_{id}.jpg"
            id += 1

            filepath = os.path.join(individual_path, filename)
            image_pil = Image.fromarray(np.uint8(face_crop))
            image_pil.save(filepath)

In [None]:
for split in ["train", "val", "test"]:
    split_path = os.path.join(dataset_path, split)
    os.makedirs(split_path, exist_ok=True)

df = pd.read_csv(annotations_path)
#Not these labels (['NEGATIVE', 'VELU', 'PAMA', 'YO']) because either not an individual or not enough data for the individual
individuals = get_individuals(df, exclude_list=['NEGATIVE', 'VELU', 'PAMA', 'YO'])
videos_resolution = compute_video_resolutions(df, videos_path)
split_datas = split_and_filter_individual_data(df, individuals, videos_resolution)

data_split = {}
for individual in individuals:
    data_split[individual] = split_and_filter_individual_data(df, individual, videos_resolution)

for split in ["train", "val", "test"]:
    os.makedirs(os.path.join(dataset_path, split), exist_ok=True)
    subset = {}
    for individual, splits in data_split.items():
        subset[individual] = splits[split]

    save_cropped_faces(subset, split, videos_path, dataset_path)