# Datasets preparation. Adding to main data

In [None]:
import os
import pandas as pd
from utils import *
from sklearn.model_selection import train_test_split

In [None]:
def train_test_split_by_dirs(df, group_column='dir_name', train_ratio=0.8, val_ratio = 0.1):    
# Group by the given column
    grouped = df.groupby(group_column)

    # Get the unique groups
    groups = list(grouped.groups.keys())

    # Shuffle the groups
    np.random.shuffle(groups)

    # Calculate the number of groups for each set
    n_total = len(groups)
    n_train = int(n_total * train_ratio)
    n_val = int(n_total * val_ratio)

    # Split the groups
    train_groups = groups[:n_train]
    val_groups = groups[n_train:n_train + n_val]
    test_groups = groups[n_train + n_val:]

    # Create the Train, Validation, and Test DataFrames
    train_df = df[df[group_column].isin(train_groups)]
    val_df = df[df[group_column].isin(val_groups)]
    test_df = df[df[group_column].isin(test_groups)]

    return train_df, val_df, test_df

In [None]:
df = pd.read_csv('./data/test_list_mask.csv')
df = df[df['label']==0]
df['fname'] = df['fname'].apply(lambda x: x.replace('/media/user/685b3289-4051-4530-9827-ef770d2e3f28/ml_projects_yeldar/cropped_youtube_insta_tiktok', '/app/data_dir/mask_test'))
df['dir_name'] = df['fname'].apply(lambda x: x.split('/')[4])
train_df, val_df, test_df = train_test_split_by_dirs(df, group_column='dir_name', train_ratio=0.8, val_ratio = 0.1)

In [None]:
train_df.drop(columns=['dir_name'], inplace=True)
val_df.drop(columns=['dir_name'], inplace=True)
test_df.drop(columns=['dir_name'], inplace=True)
train_df

In [None]:
dftr = pd.read_csv('./data/train_list11.csv')
dfv = pd.read_csv('./data/tra/val_list11.csv')
dfts = pd.read_csv('./data/tra/test_list11.csv')

In [None]:
df = pd.concat([dftr, train_df], axis=0)
df.to_csv('./data/train_list13.csv', index=False)
df

In [None]:
df = pd.concat([dfv, val_df], axis=0)
df.to_csv('./data/val_list13.csv', index=False)

In [None]:
df = pd.concat([dfts, test_df], axis=0)
df.to_csv('./data/test_list13.csv', index=False)

In [None]:
def get_folder_nameM4(fpath):
    foldername = None
    if 'Live' in fpath:
        return 'live'
    if 'Makeup' in fpath or 'Partial_Funnyeye' in fpath or 'Paper' in fpath or 'Partial_Eye' in fpath or 'Replay' in fpath:
        foldername = 'Exclude'
    else:
        if 'Mask_Mann' in fpath:
            foldername = 'Mannequin'
        elif 'Mask_Half' in fpath:
            foldername = 'Mask_HalfMask'
        elif 'Mask_Paper' in fpath:
            foldername = 'Mask_PaperMask'
        elif 'Mask_Trans' in fpath:
            foldername = 'Mask_TransparentMask'
        elif 'Partial_Mouth' in fpath:
            foldername = 'Partial_Mouth'
        elif 'Partial_Paperglass' in fpath:
            foldername = 'Partial_PaperGlasses'
        elif 'Mask_Silicone' in fpath:
            foldername = 'Silicone'
    return foldername

df['foldername'] = df['fname'].apply(lambda x: get_folder_nameM4(x))

## Processing M2 (Swiss dataset)

In [None]:
xcsmad_path = "/mnt/22TB/Yeldar/XCSMAD_decoded/gray/color/videos/face-station"
xcsmad_prefix = 'xcsmad'
dirk = {}
print("Starting processing dataset xcsmad")
for sub_dir in os.listdir(xcsmad_path):
    s_path = os.path.join(xcsmad_path, sub_dir)
    subdirs = os.listdir(s_path)
    for sdir in subdirs:
        sspath = os.path.join(s_path, sdir)
        dirk[sdir] = sspath

In [None]:
def generate_labels_csv(dirr):
    train_files = os.listdir(dirr)
    df = pd.DataFrame(columns=["label", "fname"])
    df["fname"] = [os.path.join(dirr, x) for x in train_files]
    # if 'mask' in dirr:
    if dirr.split('_')[-2] == '2':
        df["label"] = 0
    elif dirr.split('_')[-2] == '0':
        df["label"] = 1
    else:
        print(dirr, dirr.split('_')[-2], type(dirr.split('_')[-2]))
    return df

In [None]:
dir_p = "/mnt/22TB/Yeldar/XCSMAD_decoded/gray/color/videos/face-station"
dirs = os.listdir(dir_p)
# for tp in ['train', 'test', 'val']:
    # for cp in ['live', 'mask']:
# dirr1 = f"{dir_p}/{tp}/live"
df = pd.DataFrame(columns=["label", "fname"])
for dir in dirs:
    dp = os.path.join(dir_p, dir)
    dirss = os.listdir(dp)
    for dirr in dirss:
        dpp = os.path.join(dp, dirr)
        df1 = generate_labels_csv(dpp)
        df = pd.concat([df, df1], ignore_index=True)
    # print(df.head())
df.to_csv("xscmad.csv", index=False)

## Area distribution

In [None]:
import pandas as pd
import cv2
import matplotlib.pyplot as plt
import seaborn as sns

def get_area(ip):
    ip = ip.replace("data_dir", "/media/user/685b3289-4051-4530-9827-ef770d2e3f28/ml_projects_yeldar/patchnet")
    try:
        return cv2.imread(ip).shape
    except:
        print(ip)
        return None
    
def get_min_shape(shp):
    try:
        return min(shp[:2])
    except:
        print(shp)
        return None

tp = 'train'
train_csv = f'./data/{tp}_list11.csv'
df = pd.read_csv(train_csv)
df['shape'] = df['fname'].apply(lambda x: get_area(x))
df['shape_min'] = df['shape'].apply(lambda x: get_min_shape(x))
# df = df[df['shape_min'] >= 112]
# df.to_csv(train_csv, index=False)

In [None]:
hist = df['shape_min'].hist(bins=10)

In [None]:
plt.figure(figsize=(8, 6))
plt.hist(df['shape_min'], bins=10, color='blue', edgecolor='black')
plt.title('Distribution of Minimum Image Shape')
plt.xlabel('Minimum Side Length (pixels)')
plt.ylabel('Frequency')
plt.grid(True)
plt.show()

In [None]:
plt.figure(figsize=(10, 6))

# Plot histogram with seaborn
bins = range(0, max(df['shape_min']) + 50, 50)
sns.histplot(df['shape_min'], bins=bins, kde=True, color='blue')

# Add a title and labels
plt.title('Distribution of M1, M3, M4 - train data min img.shape, Total train data - 140k images')
plt.xlabel('Minimum Side Length (pixels)')
plt.ylabel('Frequency')

# Set the x-axis tick intervals (optional: customize the range)
plt.xticks(range(0, max(df['shape_min']) + 100, 100))

# Display the grid and show the plot
plt.grid(True)
plt.show()

## Adding D3 live to train, val datasets

In [2]:
from tqdm import tqdm
import os
import pandas as pd

In [3]:
dpath = "/home/user/ml_projects/Yeldar/data/SSAN_data/data/tra"
df1 = pd.read_csv(os.path.join(dpath, "new_live_D3_train_data.csv"))
df1

Unnamed: 0,label,fname,dataset
0,1.0,/mnt/22tb/Nurmukhammed/CROPPED_FAS_DATASETS/RE...,D3
1,1.0,/mnt/22tb/Nurmukhammed/CROPPED_FAS_DATASETS/RE...,D3
2,1.0,/mnt/22tb/Nurmukhammed/CROPPED_FAS_DATASETS/RE...,D3
3,1.0,/mnt/22tb/Nurmukhammed/CROPPED_FAS_DATASETS/RE...,D3
4,1.0,/mnt/22tb/Nurmukhammed/CROPPED_FAS_DATASETS/RE...,D3
...,...,...,...
57955,1.0,/mnt/22TB/Nurmukhammed/CROPPED_FAS_DATASETS/RE...,D3
57956,1.0,/mnt/22TB/Nurmukhammed/CROPPED_FAS_DATASETS/RE...,D3
57957,1.0,/mnt/22TB/Nurmukhammed/CROPPED_FAS_DATASETS/RE...,D3
57958,1.0,/mnt/22TB/Nurmukhammed/CROPPED_FAS_DATASETS/RE...,D3


In [None]:
def get_new_imgs(dpath, train_csv='data/full_mask_detection_dataset.csv', size=3000):
    df = pd.read_csv(train_csv)
    df['subject'] = df['fname'].apply(lambda x : x.split('/')[-2])
    folders = os.listdir(dpath)
    newd3folders = []
    for f in tqdm(folders):
        if f not in list(df['subject']):
            newd3folders.append(f)
        if len(newd3folders) >= size:
            break
    newd3folders = newd3folders[:size]
    new_d3imgs = []
    for newf in tqdm(newd3folders):
        newfp = os.path.join(dpath, newf)
        new_d3imgs.extend([os.path.join(newfp, x) for x in os.listdir(newfp)])
    finald3imgs = []
    for x in tqdm(new_d3imgs):
        if '.db' not in x:
            finald3imgs.append(x)
        if len(finald3imgs) >= size:
            break
    return finald3imgs[:size]

# When dpath has no subfolders, but only images
def get_new_imgs2(dpath, train_csv='data/full_mask_detection_dataset.csv', size=3000):
    df = pd.read_csv(train_csv)
    imgs = os.listdir(dpath)
    newimgs = []
    for f in tqdm(imgs):
        if f not in list(df['subject']) and '.db' not in f:
            newimgs.append(os.path.join(dpath, f))
        if len(newimgs) >= size:
            break
    new_d3imgs = newimgs[:size]
    return new_d3imgs[:size]

def add_to_df(new_img_paths, dataset_name, df_name='new_train_data.csv', label=1.0):
    try:
        df = pd.read_csv(df_name)
    except:
        df = pd.DataFrame({"label":[], "fname":[], "dataset":[]})

    new_df = pd.DataFrame({"label":[label]*len(new_img_paths), "fname":new_img_paths, "dataset":[dataset_name]*len(new_img_paths)})
    df = pd.concat([df, new_df], ignore_index=True)
    df.to_csv(df_name, index=False)

In [None]:
dpath = "/mnt/22TB/Nurmukhammed/CROPPED_FAS_DATASETS/RETINA_FACE_DETECTIONS/D3_updated/live/"
dataset_name = 'D3'
size = 7000
label = 1
new_img_paths = get_new_imgs(dpath, train_csv='/mnt/8TB/ml_projects_yeldar/patchnet/train/train_list10.csv', size=size)
add_to_df(new_img_paths, dataset_name, df_name='new_val_data.csv')

In [None]:
ndf = pd.read_csv('data/new_val_data.csv')
ndf

In [None]:
train_csv = f'/mnt/8TB/ml_projects_yeldar/patchnet/val/val_list9.csv'
alldf = pd.read_csv(train_csv)
df = pd.concat([alldf, ndf], ignore_index=True)
df = df[~df['fname'].str.contains("Thumbs.db")]
df.to_csv(f'/mnt/8TB/ml_projects_yeldar/patchnet/val/val_list10.csv', index=False)

## Preprocessing Synthez data

In [None]:

# Set paths
data_dir = '/mnt/8TB/ml_projects_yeldar/MaskSynthez/cropped_M4'  # Replace with the path to your folder containing 28 subfolders
output_dir = "/home/user/ml_projects/Yeldar/SSAN"  # Replace with where you want to save the CSV files

In [None]:
import os
import random
import csv
from pathlib import Path


# Distribution for train/val/test
train_ratio = 0.8
val_ratio = 0.1
test_ratio = 0.1

# Lists to store paths for each split
train_data = []
val_data = []
test_data = []

# Process each subfolder
for subfolder in sorted(Path(data_dir).glob("*/")):
    images = list(subfolder.glob("*.jpg"))  # Adjust extension if needed (e.g., .png)
    random.shuffle(images)
    
    # Calculate split indices
    train_count = int(len(images) * train_ratio)
    val_count = int(len(images) * val_ratio)
    
    # Split data
    train_data.extend([(img, subfolder.name) for img in images[:train_count]])
    val_data.extend([(img, subfolder.name) for img in images[train_count:train_count + val_count]])
    test_data.extend([(img, subfolder.name) for img in images[train_count + val_count:]])

# Function to save split data to CSV
def save_to_csv(data, filename):
    with open(filename, mode="w", newline="") as file:
        writer = csv.writer(file)
        writer.writerow(["image_path", "label"])  # CSV header
        for img_path, label in data:
            writer.writerow([str(img_path), label])

# Save data to CSV files
save_to_csv(train_data, os.path.join(output_dir, "m4_synthez_train.csv"))
save_to_csv(val_data, os.path.join(output_dir, "m4_synthez_val.csv"))
save_to_csv(test_data, os.path.join(output_dir, "m4_synthez_test.csv"))

print("CSV files created successfully!")


In [25]:
csv_name = "./data/m4_synthesized_masks_clean.csv"
df = pd.read_csv(csv_name)
df.rename(columns={"gen_path":"fname"}, inplace=True)
df['label'] = [0]*len(df)
df.drop(columns=["src_path", "ref_path", "gen_label"], inplace=True)
df['fname'] = df['fname'].apply(lambda x: x.replace("M4_clean", "M4_clean_cropped")) 
df

Unnamed: 0,fname,label
0,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
1,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
2,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
3,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
4,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
...,...,...
34975,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
34976,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
34977,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
34978,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0


In [26]:
from sklearn.model_selection import train_test_split

train_df, test_df = train_test_split(df, test_size=0.1, random_state=42)
train_df

Unnamed: 0,fname,label
19685,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
30034,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
29416,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
22040,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
10030,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
...,...,...
16850,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
6265,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
11284,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0
860,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0


In [29]:
dType = 'val'
csv_name2 = f"data/{dType}_list13.csv"
df1 = pd.read_csv(csv_name2)
df1

Unnamed: 0,fname,label
0,/mnt/8TB/ml_projects_yeldar/patchnet/M1_3_4_va...,1.0
1,/mnt/8TB/ml_projects_yeldar/patchnet/M1_3_4_va...,1.0
2,/mnt/8TB/ml_projects_yeldar/patchnet/M1_3_4_va...,1.0
3,/mnt/8TB/ml_projects_yeldar/patchnet/M1_3_4_va...,1.0
4,/mnt/8TB/ml_projects_yeldar/patchnet/M1_3_4_va...,1.0
...,...,...
18210,/mnt/8TB/ml_projects_yeldar/cropped_youtube_in...,0.0
18211,/mnt/8TB/ml_projects_yeldar/cropped_youtube_in...,0.0
18212,/mnt/8TB/ml_projects_yeldar/cropped_youtube_in...,0.0
18213,/mnt/8TB/ml_projects_yeldar/cropped_youtube_in...,0.0


In [32]:
csv_name3 = f"data/{dType}_list15.csv"
df1 = pd.concat([df1, test_df], ignore_index=True)
df1['fname'] = df1['fname'].apply(lambda x: x.replace("/media/user/685b3289-4051-4530-9827-ef770d2e3f28", "/mnt/8TB"))
df1.to_csv(csv_name3, index=False)
df1

Unnamed: 0,fname,label
0,/mnt/8TB/ml_projects_yeldar/patchnet/M1_3_4_va...,1.0
1,/mnt/8TB/ml_projects_yeldar/patchnet/M1_3_4_va...,1.0
2,/mnt/8TB/ml_projects_yeldar/patchnet/M1_3_4_va...,1.0
3,/mnt/8TB/ml_projects_yeldar/patchnet/M1_3_4_va...,1.0
4,/mnt/8TB/ml_projects_yeldar/patchnet/M1_3_4_va...,1.0
...,...,...
21708,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0.0
21709,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0.0
21710,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0.0
21711,/mnt/8TB/ml_projects_yeldar/MaskSynthez/M4_cle...,0.0


In [None]:
dType = 'train'
csv_name2 = f"data/{dType}_list14.csv"
df1 = pd.read_csv(csv_name2)
df1

In [None]:
dff = pd.read_csv(csv_name2, delimiter=",", header=None).drop([0], axis=0)
dff[0] = [x.replace("/mnt/8TB/ml_projects_yeldar/patchnet", "/app/data_dir") for x in dff[0]]
dff[0] = [x.replace("/mnt/8TB/ml_projects_yeldar/MaskSynthez", "/app/data_dir") for x in dff[0]]
dff[0] = [x.replace("/mnt/8TB/ml_projects_yeldar/cropped_youtube_insta_tiktok", "/app/data_dir/mask_test") for x in dff[0]]

dff

In [None]:
df1.to_csv("data/test_list14.csv", index=False)