<a href="https://colab.research.google.com/github/RiyanMak/AIM25/blob/preprocessing-pipeline/Data_Processing_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [25]:
import cv2
import numpy as np
import os
import glob
from os import listdir
!pip install mediapipe
import mediapipe as mp



I made 3 different resizing methods, not sure which one we should use. Maybe resizing while maintaining aspect ratio
is better in case the images aren't perfectly squares and we want target size to be a square. The resize function also
uses the same interpolation methods as the other two functions.
All functions take image as numpy array.

In [26]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [29]:
class ImageProcessor():
    # Initializes standard image sizes: width and height = 64. Changing these values changes values in resize functions
    def __init__(self, image_size=(64, 64)):
        self.target_size = image_size

    # open and read images from specified file path
    def load_image(self, image_path):
        # Loads an image from the given path and returns it as a numpy array
        image = cv2.imread(image_path)
        if image is None:
            print(f"ERROR: Unable to read image at {image_path}")
        return image

     # ensures that the input is not an empty array
    def check_if_valid(self, image):
        if not isinstance(image, np.ndarray):
            return False  # return False if not valid
        return True  # return True if valid

     # checks to see if image is grayscale
    def convert_to_grayscale(self, image):
        # RGB colored images usually have 3 dimensions (width, height, color)
        # if the thrid dimesion of the tuple (represented by index of 2) has the color channel of RGB, then convert to grayscale
        if len(image.shape) == 3 and image.shape[2] == 3:
            image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        return image

    #scales pixels to [0,1]
    def normalize_image(self, image):
      #converts to float32 in case data is integers
      normalized_image = image.astype(np.float32) / 255.0
      return normalized_image

    # used to sharpen images using cv2 kernels (if data requires image sharpening)
    def sharpen_image (self, image):
      kerenel_sharpening = np.array([[-1,-1,-1],
                                      [-1,9,-1],
                                      [-1,-1,-1]])
      sharpened_image = cv2.filter2D(image, -1, kerenel_sharpening)
      return sharpened_image

    # (1) resizes the image to target size using INTER_AREA interpolation, recommended when downscaling original image
    def downscale_image(self, image):
      return cv2.resize(image, self.target_size, interpolation=cv2.INTER_AREA)

    # (2) resizes the image to target size using INTER_CUBIC interpolation, recommended when upscaling original image
    def upscale_image(self, image):
      return cv2.resize(image, self.target_size, interpolation=cv2.INTER_CUBIC)

    # (3) resizes an image to target size while maintaining ratios (uses black padding to cover open space), prevents distortion.
    # might need to change this function a little if image is not grayscaled
    def resize_with_aspect_ratio(self, image, pad_color):
      h, w = image.shape[:2]  #gets original dimensions of image
      target_h, target_w = self.target_size  #gets target dimensions

      aspect_original = w / h  #original aspect ratio
      aspect_target = target_w / target_h  #target aspect ratio

      # If the image already fits within the target size, no padding is required.
      if h == target_h and w == target_w:
          return image  # Return the image without changes if no padding is needed

      # if our model does not need all images to be exact same size, padding not necessary
      padded_image = np.zeros((target_h, target_w), dtype=np.uint8)  #creates blank image with target size (padding)
      padded_image[:] = pad_color  #sets padded image color to black

      #if original image is wider, recalculates height and sets width to max width (target width)
      if aspect_original > aspect_target:
          new_w = target_w
          new_h = int(new_w / aspect_original)
          #resizes image and uses either INTER_AREA or INTER_CUBIC depending on downscale or upscale
          if new_h <= h:
            resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
          else:
            resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_CUBIC)

          y_offset = (target_h - new_h) // 2  #calculates offset to center image on padded image (idk if we need this tbh)
          padded_image[y_offset:y_offset + new_h, :] = resized  #places image on padded image

      #if original image is taller, recalculates width and sets height to max height (target height)
      else:
          new_h = target_h
          new_w = int(new_h * aspect_original)
          if new_w <= w:
            resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
          else:
            resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_CUBIC)

          x_offset = (target_w - new_w) // 2
          padded_image[:, x_offset:x_offset + new_w] = resized

      return padded_image  #returns original image


    def clean_landmarks(self, landmarks_array):
      if landmarks_array is None or len(landmarks_array) == 0:
        return np.array([])
      cleaned_landmarks = landmarks_array.tolist()
      num_landmarks = len(cleaned_landmarks[0])
      all_values = [landmark[i] for landmark in cleaned_landmarks for i in range(num_landmarks) if landmark[i] is not None]
      if all_values:
        mean_val = np.mean(all_values)
        for j in range(len(cleaned_landmarks)):
          for i in range(num_landmarks):
            if cleaned_landmarks[j][i] is None:
              cleaned_landmarks[j][i] = mean_val
      else:
        print("Warning: No valid data for mean calculation, skipping mean replacement")
      all_values = [landmark[i] for landmark in cleaned_landmarks for i in range(num_landmarks) if landmark[i] is not None]
      if all_values:
        q1 = np.quantile(all_values, 0.25)
        q3 = np.quantile(all_values, 0.75)
        iqr = q3 - q1
        lower_bound = q1 - 1.5 * iqr
        upper_bound = q3 + 1.5 * iqr
        cleaned_landmarks_filtered = []
        for landmark in cleaned_landmarks:
          if all(lower_bound <= landmark[i] <= upper_bound for i in range(num_landmarks)):
            cleaned_landmarks_filtered.append(landmark)
        return np.array(cleaned_landmarks_filtered)
      else:
        print("Warning: No valid data for outlier detection, returning unfiltered landmarks")
        return np.array(cleaned_landmarks)


        def process_image(self, image_path): #added this needed function.
          image = self.load_image(image_path)
          if image is None:
              return (None, image_path, None)

          image = self.convert_to_grayscale(image)
          image = self.resize_with_aspect_ratio(image, 0)
          image = self.normalize_image(image)

          return (image, image_path, None)


    #function to process images in a file
    #takes landmarks as a dictionary where keys are image filenames and values are corresponding landmark data
    #returns a list of tuples, where each tuple contains (processed_image, image_path, cleaned_landmarks(if applicable))
    def process_multiple_image(self, folder_path, landmarks_dict=None):
      results = []
      for filename in os.listdir(folder_path):
          image_path = os.path.join(folder_path, filename)
          processed_result = self.process_image(image_path)

          if processed_result[0] is not None:
              results.append(processed_result)

      if landmarks_dict is not None:
          cleaned_landmarks_dict = self.process_landmark_dictionary_combined(landmarks_dict)
          for i, result in enumerate(results):
              image_name = os.path.basename(result[1])
              if cleaned_landmarks_dict.get(image_name) is not None:
                  results[i] = (result[0], result[1], cleaned_landmarks_dict[image_name])
      return results

    mp_face_mesh = mp.solutions.face_mesh
    mp_face_detection = mp.solutions.face_detection
    mp_drawing = mp.solutions.drawing_utils
    mp_drawing_styles = mp.solutions.drawing_styles

    def process_images_with_landmarks_in_folder(self, folder_path):
        # Used to store the landmarks
        all_landmarks = {}

        # Get all image paths (both .jpg and .png)
        image_paths = glob.glob(os.path.join(folder_path, "*.jpg")) + glob.glob(os.path.join(folder_path, "*.png"))

        # Loop through each image in the folder
        for image_path in image_paths:
            processor = ImageProcessor(image_size=(128, 128))

            # Get each image in the folder
            image = processor.load_image(image_path)
            if image is None:
                print(f"Error: Could not load image {image_path}")
                all_landmarks[image_path] = None
                continue

            # Add mesh to the faces
            face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1, refine_landmarks=True)
            face_detection = mp_face_detection.FaceDetection(min_detection_confidence=0.5)

            # Detect the face so we can get the landmarks
            faces = detect_faces(image, face_detection)
            if not faces:
                print(f"No faces detected in image: {image_path}")
                all_landmarks[image_path] = None
                continue

            # Extract landmarks
            landmarks_dict = extract_facial_landmarks(image, face_mesh)
            if not landmarks_dict:
                print(f"No facial landmarks detected in image: {image_path}")
                all_landmarks[image_path] = None
                continue

            # Process landmarks
            processed_landmarks_dict = {}
            for face_idx, landmarks in landmarks_dict.items():
                processed_landmarks = processor.process_landmarks(landmarks)
                processed_landmarks_dict[face_idx] = processed_landmarks

            # Store the processed landmarks in the dictionary
            all_landmarks[image_path] = processed_landmarks_dict

        return all_landmarks



if __name__ == "__main__":
    path = '/content/drive/MyDrive/RAFD Test Image Folder'
    #stored the return landmarks
    processor = ImageProcessor()
    landmarks_by_image = processor.process_images_with_landmarks_in_folder(path)

    processor.process_multiple_image(path, landmarks_by_image)




NameError: name 'detect_faces' is not defined