### Import dependencies

In [None]:
# %pip install --upgrade pip --quiet && pip install -r ../../requirements.txt --quiet

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import tensorflow as tf
from dotenv import load_dotenv
import matplotlib.pyplot as plt
from IPython.display import clear_output
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling2D, Dropout, BatchNormalization

### Load CSV data

In [None]:
load_dotenv()

DATA_DIR = os.getenv('DATA_DIR')

In [None]:
def load_train_df() -> None:
    return pd.read_csv(f"{DATA_DIR}/raw/train.csv", dtype={
        "img": int, # Image ID
        "x": int, # X1 coordinate of the top-left corner of the bounding box
        "y": int, # Y1 coordinate of the top-left corner of the bounding box
        "w": int, # X2 of the bounding box
        "h": int, # Y2 of the bounding box
        "rot": int, # Whether the image shows a rotten pear (1) or not (0)
    }, index_col=0).rename(columns={
        "img": "image_id",
        "x": "x1",
        "y": "y1",
        "w": "x2",
        "h": "y2",
        "rot": "is_rotten",
    })

In [None]:
def load_test_df() -> None:
    return pd.read_csv(f"{DATA_DIR}/raw/test.csv", dtype={
        "img": int, # Image ID
        "x": int, # X1 coordinate of the top-left corner of the bounding box
        "y": int, # Y1 coordinate of the top-left corner of the bounding box
        "w": int, # X2 of the bounding box
        "h": int, # Y2 of the bounding box
    }, index_col=0).rename(columns={
        "img": "image_id",
        "x": "x1",
        "y": "y1",
        "w": "x2",
        "h": "y2",
    })

In [None]:
train_df = load_train_df()
test_df = load_test_df()

### Add image dimensions

In [None]:
def add_dimensions(df: pd.DataFrame) -> None:
    df["width"] = df["x2"] - df["x1"]
    df["height"] = df["y2"] - df["y1"]
    df["area"] = df["width"] * df["height"]

In [None]:
add_dimensions(train_df)
add_dimensions(test_df)

### Remove outliers

In [None]:
def find_outliers(df: pd.DataFrame, column: str) -> pd.DataFrame:
    # Calculate the interquartile range (IQR) for the column
    Q1 = df[column].quantile(0.25)
    Q3 = df[column].quantile(0.75)
    IQR = Q3 - Q1

    # Create and return a mask for outliers
    return (df[column] >= Q1 - 1.5 * IQR) & (df[column] <= Q3 + 1.5 * IQR)

In [None]:
def remove_outliers(df: pd.DataFrame) -> pd.DataFrame:
    # Find the outliers for width, height, and area
    width_outliers = find_outliers(df, "width")
    height_outliers = find_outliers(df, "height")
    area_outliers = find_outliers(df, "area")

    # Combine the masks into one
    combined_mask = width_outliers & height_outliers & area_outliers

    # Return the filtered DataFrame
    return df[combined_mask]

In [None]:
filtered_train_df = remove_outliers(train_df)

# Print the shapes of the original and filtered train_df
print(f"Original train_df shape: {train_df.shape}")
print(f"Filtered train_df shape: {filtered_train_df.shape}")

### Find the optimal dimensions for resizing without losing pixels

In [None]:
# Find the largest image dimensions of the train set
max_width = filtered_train_df["width"].max()
max_height = filtered_train_df["height"].max()
max_dimension = max(max_width, max_height)

### Load & process images

In [None]:
def safe_create_directory(directory: str) -> None:
    try:
        os.makedirs(directory, exist_ok=True)
    except OSError as e:
        print(f"Error creating directory {directory}: {e}")

In [None]:
def is_directory_empty(directory: str) -> bool:
    safe_create_directory(directory)

    return len(os.listdir(directory)) == 0

In [None]:
def load_image(image_path: str) -> np.ndarray:
    return cv2.imread(image_path)

In [None]:
def get_raw_image_path(image_id: int) -> str:
    return f"{DATA_DIR}/raw/images/{image_id:08d}.png"

In [None]:
def load_raw_image(image_id: int) -> np.ndarray:
    return load_image(get_raw_image_path(image_id))

In [None]:
def get_processed_image_path(row: pd.Series) -> str:
    subdirectory = "test"
    rotten_subdirectory = ""
    if "is_rotten" in row:
        subdirectory = "train"
        rotten_subdirectory = "rotten/" if row["is_rotten"] == 1 else "not_rotten/"

    return f"{DATA_DIR}/processed/images/{subdirectory}/{rotten_subdirectory}{row.name}.png"

In [None]:
def crop_image(image: np.ndarray, row: pd.Series) -> np.ndarray:
    left_x, top_y, right_x, bottom_y = row[["x1", "y1", "x2", "y2"]]
    return image[top_y:bottom_y, left_x:right_x]

In [None]:
def resize_image(image: np.ndarray) -> np.ndarray:
    current_height, current_width, _ = image.shape
    if current_width > current_height:
        new_width = max_dimension
        new_height = int(current_height * (max_dimension / current_width))
    else:
        new_height = max_dimension
        new_width = int(current_width * (max_dimension / current_height))
    resized_image = cv2.resize(image, (new_width, new_height))

    return resized_image

In [None]:
def pad_image(image: np.ndarray) -> np.ndarray:
    PADDING_COLOR = [255, 0, 255]
    width_diff = max_dimension - image.shape[1]
    height_diff = max_dimension - image.shape[0]
    top = bottom = height_diff // 2
    left = right = width_diff // 2

    return cv2.copyMakeBorder(image, top, bottom, left, right, cv2.BORDER_CONSTANT, None, value=PADDING_COLOR)

In [None]:
def extract_pear(image: np.ndarray, row: pd.Series) -> np.ndarray:
    cropped_image = crop_image(image, row)
    resized_image = resize_image(cropped_image)
    padded_image = pad_image(resized_image)

    return padded_image

In [None]:
def extract_pears_from_image(image_id: int, df: pd.DataFrame) -> None:
    image = load_raw_image(image_id)
    pears_in_image = df[df["image_id"] == image_id]

    # Crop the image for each row
    for _, row in pears_in_image.iterrows():
        # Get the current pear from the loaded image
        processed_image = extract_pear(image, row)
        # Get the path to save the processed image
        processed_path = get_processed_image_path(row)
        # Save the processed image
        cv2.imwrite(processed_path, processed_image)

In [None]:
def create_processed_directories() -> None:
    safe_create_directory(f"{DATA_DIR}/processed/images/train/rotten")
    safe_create_directory(f"{DATA_DIR}/processed/images/train/not_rotten")
    safe_create_directory(f"{DATA_DIR}/processed/images/test")

In [None]:
def process_all_images(df: pd.DataFrame) -> None:
    subdirectory = "train" if "is_rotten" in df else "test"
    full_directory = f"{DATA_DIR}/processed/images/{subdirectory}"

    if not is_directory_empty(full_directory):
        print(f"Images have already been processed in {full_directory}")
        return
    
    create_processed_directories()
    unique_image_ids = df["image_id"].unique()

    for image_id in unique_image_ids:
        extract_pears_from_image(image_id, df)

In [None]:
process_all_images(filtered_train_df)
process_all_images(test_df)