In [None]:
import os
import random
import sys
import pickle

import numpy as np
import pandas as pd
import cv2
from PIL import Image

from tqdm import tqdm

In [None]:
# from google.colab import drive, files
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
# Load labels dataset
full_df = pd.read_csv('/content/gdrive/MyDrive/DL_files/full_df.csv')
print(full_df.shape[0])

In [None]:
images_folder_path = '/content/gdrive/MyDrive/DL_files/preprocessed_images'
image_size=224

NECESSARY FUNCTIONS TO CREATE SMALLER BALANCED DATASETS EACH USED IN A DIFFERENT CLASSIFICATION TASK

In [None]:
def has_disease(text, disease):
  if disease in text:
    return 1
  else:
    return 0

In [None]:
def only_disease(disease):
  """
  Returns a boolean mask indicating whether each row in the dataframe corresponds to the specified disease.

  Args:
      disease (str): The disease to filter for. Options: "C" (Cataract), "D" (Glaucoma), "G" (Diabetes), "A" (Age-related Macular Degeneration).

  Returns:
      pandas.Series: Boolean mask indicating whether each row corresponds to the specified disease.
  """
  if disease == "C":
    return (full_df.N == 0) & (full_df.D ==0) & (full_df.G ==0) & (full_df.A ==0)
  elif disease == "D":
    return (full_df.N == 0) & (full_df.C ==0) & (full_df.G ==0) & (full_df.A ==0)
  elif disease == "G":
    return (full_df.N == 0) & (full_df.C ==0) & (full_df.D ==0) & (full_df.A ==0)
  elif disease == "A":
    return (full_df.N == 0) & (full_df.C ==0) & (full_df.D ==0) & (full_df.G ==0)

In [None]:
def sample_normal_eyes(sample_size):
  """
  Samples normal fundus images from the dataset.

  Args:
      sample_size (int): Number of samples to be taken.

  Returns:
      tuple: A tuple containing two arrays:
          - left_normal (numpy.ndarray): Array of sampled left normal fundus image paths.
          - right_normal (numpy.ndarray): Array of sampled right normal fundus image paths.
  """

  left_normal = full_df.loc[(full_df.N == 1) & (full_df["Left-Diagnostic Keywords"] == "normal fundus")]["Left-Fundus"].sample(sample_size//2,random_state=42).values
  right_normal = full_df.loc[(full_df.N == 1) & (full_df["Right-Diagnostic Keywords"] == "normal fundus")]["Right-Fundus"].sample(sample_size//2,random_state=42).values
  return left_normal, right_normal

In [None]:
def create_dataset(image_category, label):
  """
  Creates a dataset from image categories and corresponding labels.

  Args:
      image_category (numpy.ndarray): Array containing image paths for each category.
      label (numpy.ndarray): Array containing labels for each category.

  Returns:
      list: A list containing tuples of image data and corresponding labels.
  """
  labels = []
  dataset = []
  for i in range(np.size(image_category,0)):
    for img in tqdm(image_category[i]):
      image_path = os.path.join(images_folder_path,img)
      try:
        image = cv2.imread(image_path,cv2.IMREAD_COLOR)
        image = cv2.resize(image,(image_size,image_size))
      except:
        continue

      dataset.append([np.array(image),np.array(label[i])])

  random.shuffle(dataset)
  return dataset

In [None]:
def sample_for_balance(sample_size):
  """
  Samples images for each disease category to balance the dataset.

  Args:
      sample_size (int): Number of samples to be taken for each category.

  Returns:
      tuple: A tuple containing arrays for each disease category:
          - N (numpy.ndarray): Array of normal fundus images.
          - C (numpy.ndarray): Array of cataract images.
          - D (numpy.ndarray): Array of diabetes images.
          - G (numpy.ndarray): Array of glaucoma images.
          - A (numpy.ndarray): Array of age-related macular degeneration (AMD) images.
  """

  LN = full_df.loc[(full_df.N == 1) & (full_df["Left-Diagnostic Keywords"] == "normal fundus")]["Left-Fundus"].sample(sample_size//2,random_state=42).values
  RN = full_df.loc[(full_df.N == 1) & (full_df["Right-Diagnostic Keywords"] == "normal fundus")]["Right-Fundus"].sample(sample_size//2,random_state=42).values
  N = np.concatenate((LN, RN),axis=0)

  LC = full_df.loc[(full_df.C ==1) & only_disease("C") & (full_df.left_cataract == 1)]["Left-Fundus"].values
  RC = full_df.loc[(full_df.C ==1) & only_disease("C") & (full_df.right_cataract == 1)]["Right-Fundus"].values
  C = np.concatenate((LC,RC),axis=0)

  LD = full_df.loc[(full_df.D ==1) & only_disease("D") & (full_df.left_diabetes == 1)]["Left-Fundus"].sample(sample_size//2,random_state=42).values
  RD = full_df.loc[(full_df.D ==1) & only_disease("D") & (full_df.right_diabetes == 1)]["Right-Fundus"].sample(sample_size//2,random_state=42).values
  D = np.concatenate((LD, RD),axis=0)

  LG = full_df.loc[(full_df.G ==1) & only_disease("G") & (full_df.left_glaucoma == 1)]["Left-Fundus"].values
  RG = full_df.loc[(full_df.G ==1) & only_disease("G") & (full_df.right_glaucoma == 1)]["Right-Fundus"].values
  G = np.concatenate((LG, RG),axis=0)

  LA = full_df.loc[(full_df.A ==1) & only_disease("A") & (full_df.left_AMD == 1)]["Left-Fundus"].values
  RA = full_df.loc[(full_df.A ==1) & only_disease("A") & (full_df.right_AMD == 1)]["Right-Fundus"].values
  A = np.concatenate((LA, RA),axis=0)

  return N, C, D, G, A

CREATION OF DATASET FOR CATARACT BINARY CLASSIFICATION

In [None]:
# FOR CATARACT (BINARY)

# Add a new column with values=1 if in the diagnosis we have the word cataract
full_df["left_cataract"] = full_df["Left-Diagnostic Keywords"].apply(lambda x: has_disease(x, "cataract"))
full_df["right_cataract"] = full_df["Right-Diagnostic Keywords"].apply(lambda x: has_disease(x, "cataract"))

# Create np.array with the names of the files we will use
left_cataract = full_df.loc[(full_df.C == 1) & (full_df.left_cataract == 1)]["Left-Fundus"].values
right_cataract = full_df.loc[(full_df.C == 1) & (full_df.right_cataract == 1)]["Right-Fundus"].values

print("Number of images in left cataract: {}".format(len(left_cataract)))
print("Number of images in right cataract: {}".format(len(right_cataract)))

# Concatenate left and right eye images
cataract = np.concatenate((left_cataract,right_cataract),axis=0)
print("Lenght=",len(cataract))

In [None]:
# These normals are for cataract
left_normal, right_normal = sample_normal_eyes(len(cataract))
normal = np.concatenate((left_normal,right_normal),axis=0)
print(len(cataract),len(normal))

In [None]:
# Create Dataset for cataract-normal (1,0)

dataset_nc = create_dataset((normal,cataract),(0,1)) # NC->normal-cataract
len(dataset_nc)

In [None]:
# Write dataset into drive

with open('/content/gdrive/MyDrive/DL_files/my_new_datasets/cataract_dataset.pkl', 'wb') as file:
  pickle.dump(dataset_nc, file)

CREATION OF DATASET FOR DIABETES BINARY CLASSIFICATION

In [None]:
# FOR DIABETIC RETINOPATHY (BINARY)

full_df["left_diabetes"] = full_df["Left-Diagnostic Keywords"].apply(lambda x: has_disease(x, "retinopathy"))
full_df["right_diabetes"] = full_df["Right-Diagnostic Keywords"].apply(lambda x: has_disease(x, "retinopathy"))

# Create np.array with the names of the files we will use
left_diabetes = full_df.loc[(full_df.D == 1) & (full_df.left_diabetes == 1)]["Left-Fundus"].values
right_diabetes = full_df.loc[(full_df.D == 1) & (full_df.right_diabetes == 1)]["Right-Fundus"].values

print("Number of images in left diabetes: {}".format(len(left_diabetes)))
print("Number of images in right diabetes: {}".format(len(right_diabetes)))

# Concatenate left and right eye images
diabetes = np.concatenate((left_diabetes,right_diabetes),axis=0)
print("Lenght=",len(diabetes))

In [None]:
# These normals are for Diabetes
left_normal, right_normal = sample_normal_eyes(len(diabetes))
normal = np.concatenate((left_normal,right_normal),axis=0)
print(len(diabetes),len(normal))

In [None]:
# Create Dataset for diabetic-normal (1,0)

dataset_nd = create_dataset((normal,diabetes),(0,1)) # ND->normal-diabetes
len(dataset_nd)

In [None]:
# Write dataset into drive

with open('/content/gdrive/MyDrive/DL_files/my_new_datasets/diabetes_dataset.pkl', 'wb') as file:
  pickle.dump(dataset_nd, file)

CREATION OF DATASET FOR GLAUCOMA BINARY CLASSIFICATION

In [None]:
# FOR GLAUCOMA (BINARY)

full_df["left_glaucoma"] = full_df["Left-Diagnostic Keywords"].apply(lambda x: has_disease(x, "glaucoma"))
full_df["right_glaucoma"] = full_df["Right-Diagnostic Keywords"].apply(lambda x: has_disease(x, "glaucoma"))

# Create np.array with the names of the files we will use
left_glaucoma = full_df.loc[(full_df.G ==1) & (full_df.left_glaucoma == 1)]["Left-Fundus"].values
right_glaucoma = full_df.loc[(full_df.G ==1) & (full_df.right_glaucoma == 1)]["Right-Fundus"].values

print("Number of images in left glaucoma: {}".format(len(left_glaucoma)))
print("Number of images in right glaucoma: {}".format(len(right_glaucoma)))

# Concatenate left and right eye images
glaucoma = np.concatenate((left_glaucoma,right_glaucoma),axis=0)
print("Lenght=", len(glaucoma))

In [None]:
# These normals are for glaucoma
left_normal, right_normal = sample_normal_eyes(len(glaucoma))
normal = np.concatenate((left_normal,right_normal),axis=0)
print(len(glaucoma),len(normal))

In [None]:
# Create Dataset for glaucoma-normal (1,0)

dataset_ng = create_dataset((normal,glaucoma),(0,1)) # NG->normal-glaucoma
len(dataset_ng)

In [None]:
# Write dataset into drive

with open('/content/gdrive/MyDrive/DL_files/my_new_datasets/glaucoma_dataset.pkl', 'wb') as file:
  pickle.dump(dataset_ng, file)

CREATION OF DATASET FOR AMD BINARY CLASSIFICATION

In [None]:
# FOR AMD (BINARY)

full_df["left_AMD"] = full_df["Left-Diagnostic Keywords"].apply(lambda x: has_disease(x, "macular degeneration"))
full_df["right_AMD"] = full_df["Right-Diagnostic Keywords"].apply(lambda x: has_disease(x, "macular degeneration"))

# Create np.array with the names of the files we will use
left_AMD = full_df.loc[(full_df.A ==1) & (full_df.left_AMD == 1)]["Left-Fundus"].values
right_AMD = full_df.loc[(full_df.A ==1) & (full_df.right_AMD == 1)]["Right-Fundus"].values

print("Number of images in left AMD: {}".format(len(left_AMD)))
print("Number of images in right AMD: {}".format(len(right_AMD)))

AMD = np.concatenate((left_cataract,right_AMD),axis=0)
print("Length",len(AMD))

In [None]:
# These normals are for AMD
left_normal, right_normal = sample_normal_eyes(len(AMD))
normal = np.concatenate((left_normal,right_normal),axis=0)
print(len(AMD),len(normal))

In [None]:
# Create Dataset for AMD-normal (1,0)

dataset_na = create_dataset((normal,AMD),(0,1)) # NA->normal-AMD
len(dataset_na)

In [None]:
# Write dataset into drive

with open('/content/gdrive/MyDrive/DL_files/my_new_datasets/AMD_dataset.pkl', 'wb') as file:
  pickle.dump(dataset_na, file)

CREATION OF DATASET FOR MULTI-LABEL CLASSIFICATION

In [None]:
# Downsample the data so we have almost equal samples from each label
N, C, D, G, A = sample_for_balance(500)
print(N.shape, C.shape, D.shape, G.shape, A.shape)

In [None]:
# Normal -> 0
# Cataract -> 1
# Diabetes -> 2
# Glaucoma -> 3
# AMD -> 4

multi_dataset = create_dataset((N, C, D, G, A), (0, 1, 2, 3, 4))

In [None]:
# Write dataset into drive

with open('/content/gdrive/MyDrive/DL_files/my_new_datasets/multi_dataset.pkl', 'wb') as file:
  pickle.dump(multi_dataset, file)