In [None]:
import os
import cv2
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm

from constants import DatasetPath

In [None]:
#Utility constants
DATASET_PATH = DatasetPath.effectivePath

def DATASET_DIRS():
	return os.listdir(DATASET_PATH)

REAL_DIRS = ["afhq", "celebahq", "coco", "ffhq", "imagenet", "landscape", "lsun", "metfaces", "cycle_gan"]

FAKE_DIRS = ["big_gan", "cips", "cycle_gan", "ddpm", "denoising_diffusion_gan", "diffusion_gan", "face_synthetics", 
				 "gansformer", "gau_gan", "generative_inpainting", "glide", "lama", "latent_diffusion", "mat", "palette", 
				 "pro_gan", "projected_gan", "sfhq", "stable_diffusion", "star_gan", "stylegan1", "stylegan2", "stylegan3",
				 "taming_transformer", "vq_diffusion"]

csv_columns_name = ['filename', 'image_path', 'target', 'category']


DIR_FOURIER_PATH = DATASET_PATH + "fourier\\"
FFTS_PATH = DIR_FOURIER_PATH + "fourier\\"

In [None]:
def df_to_csv(df, filename, path):
	"""
	Splits the DataFrame in chunks to enable tqdm progress visualization while converting the DataFrame into a '.csv' file.

	Parametres
	----------
		df (pd.DataFrame): the DataFrame to convert.
		filename (str): the desired file name (comprehensive of '.csv' extension).
		path (str): the path where the '.csv' will be stored.
	"""
	chunks = np.array_split(df.index, 100)
	for chunck, subset in enumerate(tqdm(chunks, desc="Creating \'" + filename + "\' file")):
		if chunck == 0: # first row
			df.loc[subset].to_csv(path, mode='w', index=False)
		else:
			df.loc[subset].to_csv(path, header=None, mode='a', index=False)

	print("\'" + filename + "\' has been successfully created.")

In [None]:
if("fourier" in DATASET_DIRS()):
	print("Fourier folder already exist.")
else:
	mode = 0o666
	path = os.path.join(DATASET_PATH,"fourier")
	#creates the fourier folder in the main Dataset folder
	os.mkdir(path,mode)
	
	#creates a fourier folder in the fourier folder
	os.mkdir(path + "\\fourier", mode)

In [None]:
def greyscale_FFT(img_path):
	"""
	Applies Fast Fourier Transform (FFT) to a greyscale image and returns its magnitude spectrum.

	Parameters
	----------
		img_path (str): input image path.

	Returns
	-------
		fft_img (np.ndarray): a 2D array representing the magnitude spectrum of the FFT of the input image, normalized to the range [0, 255].
	"""
	
	# Read the image from the specified path in BGR color format
	RGBimg = cv2.imread(img_path)
	
	# Convert the image from BGR to grayscale
	grayImg = cv2.cvtColor(RGBimg, cv2.COLOR_BGR2GRAY)
	
	# Apply the 2D FFT to the grayscale image
	fft_img = np.fft.fft2(grayImg)
	
	# Compute the logarithm of the absolute value of the FFT to get the magnitude
	fft_img = np.log(np.abs(fft_img))

	# Find the minimum and maximum values of the magnitude for normalization
	min_val = np.min(fft_img)
	max_val = np.max(fft_img)
	
	# Normalize the magnitude image to the range [0, 255]
	fft_img = (fft_img - min_val) * (255.0 / (max_val - min_val))
	
	# Convert the normalized image to uint8 (integer values from 0 to 255)
	fft_img = np.uint8(fft_img)

	# Return the normalized magnitude image
	return fft_img

In [None]:
def FFT_application(ds_partition_df):
	"""
	Applies Fast Fourier Transform (FFT) to the images in the given dataset.
	The function also saves resulting images to the proper directories and generates a 'metadata.csv' for utility.

	Parameters
	----------
		ds_partition_df (pd.DataFrame): the DataFrame containing dataset partition information with columns 'image_path', 'filename', 'target', and 'category'.

	Returns
	-------
		filename_matching_dict (dict): Dictionary mapping original filenames to their adjusted filenames based on the target value.
		path_matching_dict (dict): Dictionary mapping original image paths to their Fourier transformed paths.
	"""
	# DataFrame to store Fourier metadata
	fourier_metadata_df = pd.DataFrame(columns=csv_columns_name)
	
	# Dictionaries to store mappings
	path_matching_dict = {}
	filename_matching_dict = {}

	# Iterate through the dataset partition DataFrame
	for index, row in tqdm(ds_partition_df.iterrows(), total=ds_partition_df.shape[0], desc="FFT application"):
		# Construct the full path to the image
		path = DATASET_PATH + row["image_path"]
		
		# Generate the Fourier transformed image
		fft_img = greyscale_FFT(path)
		
		# Adjust the filename based on the target value
		filename = row["filename"]
		if row["target"] == 0:
			adjusted_filename = filename.replace("img", "real")
		else:
			adjusted_filename = filename.replace("img", "fake")

		# Construct the path to save the Fourier transformed image
		fft_path = FFTS_PATH + adjusted_filename
		cv2.imwrite(fft_path, fft_img)
		
		# Get the relative path of the Fourier transformed image
		split_fft_path = fft_path.split(DATASET_PATH)[-1]

		# Add metadata to the DataFrame
		fourier_metadata_df.loc[len(fourier_metadata_df)] = [adjusted_filename, split_fft_path, row["target"], row["category"]]
		
		# Update the dictionaries
		filename_matching_dict[filename] = adjusted_filename
		path_matching_dict[row["image_path"]] = split_fft_path
	
	# Save 'metadata.csv'
	df_to_csv(fourier_metadata_df, "metadata.csv", DIR_FOURIER_PATH + "metadata.csv")

	return filename_matching_dict, path_matching_dict
	

In [None]:
def get_dirs(ds_partition_df):
	"""
	Retrieves image source directories from the dataset.

	Parameters
	----------
		ds_partition_df (pandas.DataFrame): the dataframe containing the dataset partition.
	Returns
	-------
		real_dirs (set of str): a set of unique directories that contain real images.
		fake_dirs (set of str): a set of unique directories that contain fake images.
	"""
	# Initialize lists to store directories for real and fake images
	real_dirs_list = []
	fake_dirs_list = []

	# Iterate through each row in the dataframe with a progress bar
	for index, row in tqdm(ds_partition_df.iterrows(), total=ds_partition_df.shape[0]):
		# Extract the top-level directory from the image path
		dir = row["image_path"].split('/')[0]

		# Categorize the directory as 'real' or 'fake' based on its presence in REAL_DIRS
		if dir in REAL_DIRS:
			real_dirs_list.append(dir)
		else:
			fake_dirs_list.append(dir)

	real_dirs = sorted(set(real_dirs_list))
	fake_dirs = sorted(set(fake_dirs_list))

	# Return the lists of real and fake directories
	return real_dirs, fake_dirs



def custom_train_test_split(ds_partition_df, filename_matching_dict, path_matching_dict):
	"""
	Splits a dataset partition in training set and test set while keeping a 1:1 ratio between Real and Fake images.
	It also ensure equal proportion for images from different directories. 

	Parametres
	----------
		ds_partition_df (pandas.DataFrame): DataFrame containing the dataset partition
		filename_matching_dict (dict): Dictionary mapping original filenames to their adjusted filenames based on the target value.
		path_matching_dict (dict): Dictionary mapping original image paths to their Fourier transformed paths.
	
	Returns
	-------
		train_df, test_df (pandas.DataFrame): Training and Test Sets DataFrames
	"""
	
	# Initialization
	size = len(ds_partition_df)
	real_dirs, fake_dirs = get_dirs(ds_partition_df)
	print(real_dirs)
	print(fake_dirs)
	train_df = ds_partition_df

	test_df = pd.DataFrame(columns=['real', 'fake'])

	# Number of tuples to sample from each real images directory
	real_sample_size = round(size / 2 * 0.2 / len(real_dirs))
	i = 1
	#Iterate through the dataset partition and sample the same number of tuples for each fake images directory
	for dir in tqdm(real_dirs, desc="Test set Real images sampling"):
		sampled_size = 0
		for index, row in train_df.iterrows():
			if sampled_size < real_sample_size:
				image_path = row["image_path"]
				splitted_path = image_path.split('/')[0]
				if dir==splitted_path:
					test_df.loc[i] = [path_matching_dict[image_path], None]
					train_df = train_df.drop(index=index)
					sampled_size+=1
					i+=1
			else: break
	
	# Number of tuples to sample from each fake images directory
	fake_sample_size = round(size / 2 * 0.2 / len(fake_dirs))
	i = 1
	#Iterate through the dataset partition and sample the same number of tuples for each fake images directory
	for dir in tqdm(fake_dirs, desc="Test set Fake images sampling"):
			sampled_size = 0
			for index, row in train_df.iterrows():
					if sampled_size < fake_sample_size:
							image_path = row["image_path"]
							splitted_path = image_path.split('/')[0]
							if dir == splitted_path:
									test_df.loc[i] = [test_df.loc[i]["real"], path_matching_dict[image_path]]
									train_df = train_df.drop(index=index)
									sampled_size+=1
									i+=1

					else: break
	
	# Update training set 
	for index, row in train_df.iterrows():
		
		train_df.at[index, "filename"] = filename_matching_dict[row["filename"]]
		train_df.at[index, "image_path"] = path_matching_dict[row["image_path"]]


	return train_df, test_df

In [None]:
def generate_training_test_sets(ds_partition_path):
	
	ds_partition_df = pd.read_csv(ds_partition_path)

	filename_matching_dict, path_matching_dict = FFT_application(ds_partition_df)
	
	train_df, test_df = custom_train_test_split(ds_partition_df, filename_matching_dict, path_matching_dict)

	df_to_csv(train_df, "trainingSet.csv", DATASET_PATH + "trainingSet.csv")
	df_to_csv(test_df, "testSet.csv", DATASET_PATH + "testSet.csv")


In [None]:
generate_training_test_sets(DATASET_PATH + "dataset_partition.csv")

In [None]:
# Generate csv file that will be used to train the model

input_df = pd.DataFrame(columns= ["anchor","positive","negative"])

fourier_df = pd.read_csv(DATASET_PATH + "trainingSet.csv")
input_df["anchor"] = fourier_df["image_path"]

real_fourier_df = fourier_df[ fourier_df["target"] == 0 ]
fake_fourier_df = fourier_df[ fourier_df["target"] != 0 ]


for index, row in input_df.iterrows():
	image_path = row["anchor"]
	
	if "real" in image_path:
		positive_img = real_fourier_df.sample(frac=1).head(1)
		while positive_img["image_path"].iloc[0] == image_path:
			positive_img = real_fourier_df.sample(frac=1).head(1)
		input_df.at[index, "positive"] = positive_img["image_path"].iloc[0]
		negative_img = fake_fourier_df.sample(frac=1).head(1)
		input_df.at[index, "negative"] = negative_img["image_path"].iloc[0]
	elif "fake" in image_path:
		positive_img = real_fourier_df.sample(frac=1).head(1)
		input_df.at[index, "negative"] = positive_img["image_path"].iloc[0]
		negative_img = fake_fourier_df.sample(frac=1).head(1)
		while negative_img["image_path"].iloc[0] == image_path:
			negative_img = fake_fourier_df.sample(frac=1).head(1)
		input_df.at[index, "positive"] = negative_img["image_path"].iloc[0]
			

input_df.to_csv(DATASET_PATH + "input.csv", index=False)