In [None]:
!pip install keras_cv



In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers as tfkl
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import os
import gc
import shutil
import keras_cv
import cv2

input_shape = (96, 96, 3)

# Mounting, Reading clean dataset, and Label creation

In [None]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive/My Drive/[2023-2024] AN2DL/Challenge 1/

Mounted at /gdrive
/gdrive/My Drive/[2023-2024] AN2DL/Challenge 1


In [None]:
data = np.load('Data/clean_data.npz', allow_pickle=True)
imgs, labels_str = data["data"], data["labels"]
print(imgs.shape, labels_str.shape)
input_shape = imgs.shape[1:]
input_shape

(5004, 96, 96, 3) (5004,)


(96, 96, 3)

# Augmentation Helper Functions

In [None]:
def random_perspective_transform(image, intensity=0.23):
    h, w = image.shape[:2]

    # Four corners of the image
    pts1 = np.float32([[0, 0], [w, 0], [0, h], [w, h]])

    # Random shifts for four corners
    pts2 = np.float32([
        [np.random.uniform(-intensity, intensity) * w, np.random.uniform(-intensity, intensity) * h],
        [w + np.random.uniform(-intensity, intensity) * w, np.random.uniform(-intensity, intensity) * h],
        [np.random.uniform(-intensity, intensity) * w, h + np.random.uniform(-intensity, intensity) * h],
        [w + np.random.uniform(-intensity, intensity) * w, h + np.random.uniform(-intensity, intensity) * h]
    ])

    # Compute the perspective transform matrix and apply it to the image
    M = cv2.getPerspectiveTransform(pts1, pts2)
    transformed_image = cv2.warpPerspective(image, M, (w, h))

    return transformed_image

def random_perspective_transform_tf(image, intensity=0.2):
    # Reformat from batch to image
    image_np = image[0, ...]
    transformed_image_np = random_perspective_transform(image_np, intensity)
    return transformed_image_np[np.newaxis, ...]

# Create a Lambda layer for the perspective transformation
perspective_layer = tf.keras.layers.Lambda(lambda x: tf.numpy_function(
    random_perspective_transform_tf, [x], tf.float32))

# Augmentation Models

In [None]:
height = width = 96

augment_model_1 = tf.keras.Sequential([
    tfkl.RandomFlip("horizontal"),
    tfkl.RandomTranslation(0.12, 0.12),
])

augment_model_2 = tf.keras.Sequential([
    tfkl.RandomRotation(0.14),
    tfkl.RandomZoom(0.07)  # Slight zoom
])

augment_model_4 = tf.keras.Sequential([
    tfkl.RandomTranslation(0.08, 0.08),
    tfkl.RandomFlip("vertical"),
])

augment_model_4a = tf.keras.Sequential([
    tfkl.RandomRotation(0.15),
    tfkl.RandomFlip("vertical"),
])

augment_model_5 = tf.keras.Sequential([
    perspective_layer
])

augment_models = [augment_model_1, augment_model_2, augment_model_4a, augment_model_5]

# Augmentation Functions

In [None]:
def augment_unhealthy(images, augment_models):
    # Create an empty array to hold the augmented images
    augmented_images = np.zeros_like(images)

    # Loop through each image in the input array
    for i, img in enumerate(images):
        # Randomly select an augmentation model
        model = np.random.choice(augment_models)
        # Apply augmentation and store in the augmented_images array
        augmented_images[i] = model(img[np.newaxis, ...], training=True).numpy()

    return augmented_images

In [None]:
def create_augmented_dataset(images, labels, target_size, augment_models):
    # Calculate how many of each class are needed
    class_counts = dict(zip(*np.unique(labels, return_counts=True)))
    additional_per_class = (target_size // 2) - class_counts[0]

    # Containers for the augmented images and labels
    augmented_images = []
    augmented_labels = []

    threshold = target_size // 2

    # Continue until we have enough images of each class
    while additional_per_class > 0:
        for img, label in zip(images, labels):
            # Randomly select an augmentation model
            model = np.random.choice(augment_models)

            # Apply augmentation - Batches are required so I reshape img to (1, 96, 96, 3), and then take the first output
            aug_img = model(img[np.newaxis, ...], training=True).numpy()[0]

            # Add to the dataset
            augmented_images.append(aug_img)
            augmented_labels.append(label)

            # Update counts and check if we have enough
            class_counts[label] += 1
            additional_per_class = (threshold) - class_counts[0]
            if class_counts[0] >= threshold and class_counts[1] >= threshold:
                break

    # Combine original and augmented data
    final_images = np.concatenate((images, np.array(augmented_images)), axis=0)
    final_labels = np.concatenate((labels, np.array(augmented_labels)), axis=0)

    # Shuffle the dataset to ensure it's well mixed
    indices = np.arange(final_images.shape[0])
    np.random.shuffle(indices)
    final_images = final_images[indices]
    final_labels = final_labels[indices]

    return final_images, final_labels

In [None]:
# Optimize image augmentation to stay beneath colab ram limits
def batch_augment_images_to_target(images, labels, augment_models, target_size, batch_size, output_dir):
    # Calculate initial class distribution
    unique, counts = np.unique(labels, return_counts=True)
    class_distribution = dict(zip(unique, counts))

    # Determine additional images needed to reach target size while maintaining distribution
    additional_images_needed = target_size - len(images)
    additional_per_class = {label: additional_images_needed // len(unique) for label in unique}

    # Create output directory if it doesn't exist
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Containers for augmented images and labels
    augmented_images = []
    augmented_labels = []

    # Batch augmentation loop
    batch_counter = 0
    while sum(additional_per_class.values()) > 0:
        # Loop through each image and label
        for img, label in zip(images, labels):
            if additional_per_class[label] > 0:
                # Randomly select an augmentation model and apply it
                model = np.random.choice(augment_models)
                augmented_img = model(img[np.newaxis, ...], training=True).numpy()[0]

                # Append the augmented image and label
                augmented_images.append(augmented_img)
                augmented_labels.append(label)

                # Decrement the count for the class
                additional_per_class[label] -= 1

                # If we've built up a full batch, save and clear the batch
                if len(augmented_images) == batch_size:
                    batch_path = os.path.join(output_dir, f'augmented_batch_{batch_counter}.npz')
                    np.savez(batch_path, data=np.array(augmented_images), labels=np.array(augmented_labels))
                    print(f"Batch {batch_counter} saved to {batch_path}")  # Debug print
                    augmented_images = []
                    augmented_labels = []
                    batch_counter += 1
                    gc.collect()  # Collect garbage to free memory

        # Shuffle the original dataset for the next pass
        indices = np.arange(len(images))
        np.random.shuffle(indices)
        images = images[indices]
        labels = labels[indices]

    # Save any remaining images that didn't make up a full batch
    if augmented_images:
        batch_path = os.path.join(output_dir, f'augmented_batch_{batch_counter}.npz')
        np.savez(batch_path, data=np.array(augmented_images), labels=np.array(augmented_labels))
        print(f"Batch {batch_counter} saved to {batch_path}")  # Debug print
        gc.collect()  # Collect garbage to free memory

# Dataset balancing (Optional)

In [None]:
labels = (labels_str == "unhealthy").astype("int")

In [None]:
# Shuffle original dataset before any operation
indices = np.arange(len(labels))
np.random.shuffle(indices)
imgs = imgs[indices]
labels = labels[indices]

In [None]:
X_train, X_val, y_train, y_val = train_test_split(imgs, labels, test_size=.2)
X_train.shape, y_train.shape

((4003, 96, 96, 3), (4003,))

In [None]:
dataset = ((X_train, y_train), (X_val, y_val))

In [None]:
balanced = []
for i in range(len(dataset)):
  batch = dataset[i]
  b_data = batch[0]
  b_labels = batch[1]
  _, (healthy, unhealthy) = np.unique(b_labels, return_counts=True)
  delta = healthy-unhealthy
  unhealthy_imgs = b_data[b_labels == 1]

  unhealthy_imgs = unhealthy_imgs[np.random.choice(unhealthy, delta, replace=False)]
  aug_balance_unhealthy = augment_unhealthy(unhealthy_imgs, augment_models)
  unhealthy_imgs.shape

  b_data = np.append(b_data, aug_balance_unhealthy, axis=0)
  b_labels = np.append(b_labels, np.ones(delta))
  balanced.append((b_data, b_labels))


In [None]:
train = balanced[0]
val = balanced[1]
indices = np.arange(len(train[1]))
np.random.shuffle(indices)
train_imgs = train[0][indices]
train_labels = train[1][indices]

In [None]:
indices = np.arange(len(val[1]))
np.random.shuffle(indices)
val_imgs = val[0][indices]
val_labels = val[1][indices]

In [None]:
np.savez("Data/Augmented/train_balanced.npz", data=np.array(train_imgs), labels=np.array(train_labels))

In [None]:
np.savez("Data/Augmented/val_balanced.npz", data=np.array(val_imgs), labels=np.array(val_labels))

In [None]:
_, (n_healthy, n_unhealthy) = np.unique(balanced[0][1], return_counts=True)
n_healthy, n_unhealthy

(2478, 2478)

In [None]:
del labels_str
del unhealthy_imgs
del data
del dataset
del X_train, X_val, y_train, y_val

In [None]:
data_size = len(balanced[0][1]) + len(balanced[1][1])

In [None]:
val_prop = 0.2
train_prop = 0.8
(val_prop, train_prop)

(0.2, 0.8)

# Dataset splitting

In [None]:
labels = (labels_str == "unhealthy").astype("int")

# Shuffle original dataset before any operation
indices = np.arange(len(labels))
np.random.shuffle(indices)
imgs = imgs[indices]
labels = labels[indices]

In [None]:
X_train, X_val, y_train, y_val = train_test_split(imgs, labels, test_size=.2)
X_train.shape, y_train.shape

((4003, 96, 96, 3), (4003,))

## Distribution analysis

In [None]:
_, (n_healthy, n_unhealthy) = np.unique(y_train, return_counts=True)
(n_healthy, n_unhealthy, ((n_healthy)/len(y_train))*100, ((n_unhealthy)/len(y_train))*100)

(2501, 1502, 62.47814139395453, 37.52185860604547)

In [None]:
_, (n_healthy, n_unhealthy) = np.unique(y_val, return_counts=True)
(n_healthy, n_unhealthy, ((n_healthy)/len(y_val))*100, ((n_unhealthy)/len(y_val))*100)

(600, 401, 59.94005994005994, 40.05994005994006)

In [None]:
train_imgs = X_train
train_labels = y_train

# Dataset augmentation (Batched)

In [None]:
# Define the size you want for the final dataset
target_size = 13000

# Call the function to generate the augmented dataset (SAVED TO YOUR DRIVE FOLDER!) - Check if you have enough space first

In [None]:
#Train augmentation
batch_augment_images_to_target(train_imgs, train_labels, augment_models, target_size=target_size, batch_size=12801, output_dir='Data/Augmented_Experimental/Train/')

# "Manual" mode (Be wary of RAM usage)

In [None]:
augmented_imgs, augmented_labels = create_augmented_dataset(imgs, labels, target_size, augment_models)

In [None]:
print(augmented_imgs.shape, augmented_labels.shape)
_, (n_healthy, n_unhealthy) = np.unique(augmented_labels, return_counts=True)
n_healthy, n_unhealthy

(26322, 96, 96, 3) (26322,)


(13322, 13000)

In [None]:
np.savez("augmented_data_v2.npz", data=augmented_imgs, labels=augmented_labels)

# Check Augmentation Results

In [None]:
def plot_random_images(images, labels, num_images=100):
    # Select a random subset of images and labels
    indices = np.random.choice(range(len(images)), num_images, replace=False)
    selected_images = images[indices]
    selected_labels = labels[indices]

    # Determine the grid size we'll need to plot the images
    grid_size = int(np.ceil(np.sqrt(num_images)))

    # Set up the matplotlib figure and axes
    fig, axes = plt.subplots(grid_size, grid_size, figsize=(15, 15))
    axes = axes.flatten()

    for img, label, ax in zip(selected_images, selected_labels, axes):
        # Display the image
        ax.imshow(img.astype('uint8'))
        # Set the title to the label of the image
        ax.set_title(f"Label: {label}")
        # Turn off axis markers
        ax.axis('off')

    # Hide any remaining axes without images
    for ax in axes[num_images:]:
        ax.axis('off')

    # Adjust layout
    plt.tight_layout()
    plt.show()

In [None]:
aug_batch = 0
data = np.load(f'Data/Augmented_Contrast/Validation/augmented_batch_{aug_batch}.npz', allow_pickle=True)
aug_imgs, aug_labels = data["data"], data["labels"]

In [None]:
aug_imgs = imgs
aug_labels = labels

In [None]:
plot_random_images(aug_imgs, aug_labels, num_images=25)

Output hidden; open in https://colab.research.google.com to view.