In [None]:
import os
import os.path as op
import json
import shutil
import logging
import numpy as np
import warnings
import random
import tensorflow as tf
import matplotlib.pyplot as plt
import pandas as pd

from PIL import Image
from skimage import io
from tqdm import tqdm
from pathlib import Path
from sklearn.metrics import precision_recall_fscore_support
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, BatchNormalization, Dropout
from tensorflow.keras.optimizers.legacy import Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras import regularizers
from tensorflow.keras.applications import EfficientNetB0, ResNet50, VGG16
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ModelCheckpoint

warnings.filterwarnings("ignore")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
logging.basicConfig(level=logging.INFO,
                    datefmt='%H:%M:%S',
                    format='%(asctime)s | %(levelname)-5s | %(module)-15s | %(message)s')

IMAGE_SIZE = (299, 299)  # All images contained in this dataset are 299x299 (originally, to match Inception v3 input size)
SEED = 17

# Head directory containing all image subframes. Update with the relative path of your data directory
data_head_dir = Path('./data')

# Find all subframe directories
subdirs = [Path(subdir.stem) for subdir in data_head_dir.iterdir() if subdir.is_dir()]
src_image_ids = ['_'.join(a_path.name.split('_')[:3]) for a_path in subdirs]

In [None]:
# Load train/val/test subframe IDs
def load_text_ids(file_path):
    """Simple helper to load all lines from a text file"""
    with open(file_path, 'r') as f:
        lines = [line.strip() for line in f.readlines()]
    return lines

# Load the subframe names for the three data subsets
train_ids = load_text_ids('./train_source_images.txt')
validate_ids = load_text_ids('./val_source_images.txt')
test_ids = load_text_ids('./test_source_images.txt')

# Generate a list containing the dataset split for the matching subdirectory names
subdir_splits = []
for src_id in src_image_ids:
    if src_id in train_ids:
        subdir_splits.append('train')
    elif src_id in validate_ids:
        subdir_splits.append('validate')
    elif(src_id in test_ids):
        subdir_splits.append('test')
    else:
        logging.warning(f'{src_id}: Did not find designated split in train/validate/test list.')
        subdir_splits.append(None)

# Loading and pre processing the data
### Note that there are multiple ways to preprocess and load your data in order to train your model in tensorflow. We have provided one way to do it in the following cell. Feel free to use your own method and get better results.

In [None]:
def load_and_preprocess(img_loc, label):
    def _inner_function(img_loc, label):
        img_loc_str = img_loc.numpy().decode('utf-8')
        img = Image.open(img_loc_str).convert('RGB')
        img = np.array(img)
        img = tf.image.resize(img, [299, 299])
        img = img / 255.0
        label = 1 if label.numpy().decode('utf-8') == 'frost' else 0

        return img, label

    X, y = tf.py_function(_inner_function, [img_loc, label], [tf.float32, tf.int64])
    X.set_shape([299, 299, 3])
    y.set_shape([])  # Scalar label

    return X, y


def load_subdir_data(dir_path, image_size, seed=None):

    """Helper to create a TF dataset from each image subdirectory"""

    # Grab only the classes that (1) we want to keep and (2) exist in this directory
    tile_dir = dir_path / Path('tiles')
    label_dir = dir_path /Path('labels')

    loc_list = []

    for folder in os.listdir(tile_dir):
        if os.path.isdir(os.path.join(tile_dir, folder)):
            for file in os.listdir(os.path.join(tile_dir, folder)):
                if file.endswith(".png"):
                    loc_list.append((os.path.join(os.path.join(tile_dir, folder), file), folder))

    return loc_list

def create_dataset(data):
    random.shuffle(data)
    img_list, label_list = zip(*data)
    img_list_t = tf.convert_to_tensor(img_list)
    lb_list_t = tf.convert_to_tensor(label_list)

    tf_data = tf.data.Dataset.from_tensor_slices((img_list_t, lb_list_t))
    tf_data = tf_data.map(load_and_preprocess, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    tf_data = tf_data.shuffle(buffer_size=buffer_size).batch(batch_size)

    return tf_data

tf_data_train, tf_data_test, tf_data_val = [], [], []
tf_dataset_train, tf_dataset_test, tf_dataset_val = [], [], []

# Update the batch and buffer size as per your model requirements
buffer_size = 64
batch_size = 32

for subdir, split in zip(subdirs, subdir_splits):
    full_path = data_head_dir / subdir
    if split=='validate':
        tf_data_val.extend(load_subdir_data(full_path, IMAGE_SIZE, SEED))
    elif split=='train':
        tf_data_train.extend(load_subdir_data(full_path, IMAGE_SIZE, SEED))
    elif split=='test':
        tf_data_test.extend(load_subdir_data(full_path, IMAGE_SIZE, SEED))

tf_dataset_train = create_dataset(tf_data_train)
tf_dataset_val = create_dataset(tf_data_val)
tf_dataset_test = create_dataset(tf_data_test)

2023-12-10 22:30:33.947248: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M2
2023-12-10 22:30:33.947274: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 8.00 GB
2023-12-10 22:30:33.947284: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 2.67 GB
2023-12-10 22:30:33.947323: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:306] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2023-12-10 22:30:33.947341: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:272] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


### (c) Training CNN + MLP

In [None]:
def image_augmentation(img, label):
    img = tf.image.random_flip_left_right(img)
    img = tf.image.random_flip_up_down(img)
    img = tf.image.random_contrast(img, lower=0.8, upper=1.2)
    img = tf.image.random_hue(img, max_delta=0.1)
    return img, label

def build_model():
    # Build the model
    model = models.Sequential()

    # Convolutional layers
    model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(299, 299, 3), kernel_regularizer=regularizers.l2(0.01)))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPooling2D((2, 2)))

    model.add(layers.Conv2D(64, (3, 3), activation='relu', kernel_regularizer=regularizers.l2(0.01)))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPooling2D((2, 2)))

    model.add(layers.Conv2D(128, (3, 3), activation='relu', kernel_regularizer=regularizers.l2(0.01)))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPooling2D((2, 2)))

    # Flatten layer
    model.add(layers.Flatten())

    # Dense layer with dropout
    model.add(layers.Dense(256, activation='relu', kernel_regularizer=regularizers.l2(0.01)))
    model.add(layers.BatchNormalization())
    model.add(layers.Dropout(0.3))

    # Output layer
    model.add(layers.Dense(2, activation='softmax'))

    # Compile the model
    model.compile(optimizer=Adam(learning_rate=0.001),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    return model

def plot(history):
    # Plot training and validation errors vs. epochs
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend(['Train', 'Val'], loc='upper right')
    plt.show()

def get_predictions(dataset):
    y_pred = model.predict(dataset)
    y_pred_classes = np.argmax(y_pred, axis=1)
    y_true = tf.concat([y for x, y in dataset], axis=0)
    return y_true, y_pred_classes

In [None]:
augmented_dataset_train = tf_dataset_train.map(image_augmentation, num_parallel_calls=tf.data.experimental.AUTOTUNE)
augmented_dataset_train = augmented_dataset_train.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
checkpoint_filepath = 'cnn.h5'
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
model_checkpoint = ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_best_only=True,  # Saves only the best model based on the monitored quantity
    monitor='val_loss',   # Monitors validation loss
    mode='min',           # Mode for monitoring ('min' for loss, 'max' for accuracy, etc.)
    save_weights_only=False,  # Saves the entire model, not just the weights
    verbose=1             # Display updates about the saved model
)

model = build_model()
history = model.fit(augmented_dataset_train, epochs=20, validation_data=tf_dataset_val,callbacks=[early_stopping, model_checkpoint])

Epoch 1/20


2023-12-10 22:30:34.522031: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_1' with dtype string and shape [29679]
	 [[{{node Placeholder/_1}}]]
2023-12-10 22:30:34.522233: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [29679]
	 [[{{node Placeholder/_0}}]]
2023-12-10 22:30:34.556017: W tensorflow/tsl/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz
2023-12-10 22:30:34.940126: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.




2023-12-10 22:43:02.513835: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_1' with dtype string and shape [11286]
	 [[{{node Placeholder/_1}}]]
2023-12-10 22:43:02.514261: I tensorflow/core/common_runtime/executor.cc:1197] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype string and shape [11286]
	 [[{{node Placeholder/_0}}]]
2023-12-10 22:43:02.937867: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.



Epoch 1: val_loss improved from inf to 0.78752, saving model to cnn.h5
Epoch 2/20

In [None]:
plot(history)

In [None]:
# Get predictions and true classes for train, validation, and test datasets
y_true_train, y_pred_train = get_predictions(tf_dataset_train)
y_true_val, y_pred_val = get_predictions(tf_dataset_val)
y_true_test, y_pred_test = get_predictions(tf_dataset_test)

precision_train_cnn, recall_train_cnn, f1_train_cnn, _ = precision_recall_fscore_support(y_true_train, y_pred_train, average='weighted')
precision_val_cnn, recall_val_cnn, f1_val_cnn, _ = precision_recall_fscore_support(y_true_val, y_pred_val, average='weighted')
precision_test_cnn, recall_test_cnn, f1_test_cnn, _ = precision_recall_fscore_support(y_true_test, y_pred_test, average='weighted')

data = {
    'Precision' : [precision_train_cnn, precision_val_cnn, precision_test_cnn],
    'Recall' : [recall_train_cnn, recall_val_cnn, recall_test_cnn],
    'F1-Score' : [f1_train_cnn, f1_val_cnn, f1_test_cnn]
}

df_cnn = pd.DataFrame(data, index=['Train', 'Validation', 'Test'])
df_cnn

### (d) Transfer Learning

In [None]:
def transfer_learning(base_model):
    # Freeze all layers of the base model
    base_model.trainable = False

    # Create the new model with the pre-trained base model
    model = models.Sequential([
        base_model,
        layers.Flatten(),
        layers.Dense(128, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.3),
        layers.Dense(2, activation='softmax')
    ])

    # Compile the model
    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

# Early stopping
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

## EfficientNetB0

In [None]:
# Create transfer models
efficient_net = EfficientNetB0(input_shape=(299, 299, 3), include_top=False, weights='imagenet')
model_checkpoint = ModelCheckpoint(filepath='enb0.h5'.format(model_name),save_best_only=True, monitor='val_loss',
      mode='min', save_weights_only=False, verbose=1)
efficient_net_model = transfer_learning(efficient_net)

In [None]:
efficient_net_history = efficient_net_model.fit(
    augmented_dataset_train,
    epochs=20,
    validation_data=tf_dataset_val,
    batch_size=8,
    callbacks=[early_stopping, model_checkpoiny]
)

In [None]:
plot(efficient_net_history)

In [None]:
# Get predictions and true classes for train, validation, and test datasets
y_true_train, y_pred_train = get_predictions(tf_dataset_train)
y_true_val, y_pred_val = get_predictions(tf_dataset_val)
y_true_test, y_pred_test = get_predictions(tf_dataset_test)

precision_train_en, recall_train_en, f1_train_en, _ = precision_recall_fscore_support(y_true_train, y_pred_train, average='weighted')
precision_val_en, recall_val_en, f1_val_en, _ = precision_recall_fscore_support(y_true_val, y_pred_val, average='weighted')
precision_test_en, recall_test_en, f1_test_en, _ = precision_recall_fscore_support(y_true_test, y_pred_test, average='weighted')

data = {
    'Precision' : [precision_train_en, precision_val_en, precision_test_en],
    'Recall' : [recall_train_en, recall_val_en, recall_test_en],
    'F1-Score' : [f1_train_en, f1_val_en, f1_test_en],
}

df_en = pd.DataFrame(data, index=['Train', 'Validation', 'Test'])
df_en

## ResNet50

In [None]:
resnet50 = ResNet50(input_shape=(299, 299, 3), include_top=False, weights='imagenet')
model_checkpoint = ModelCheckpoint(filepath='rn50.h5'.format(model_name),save_best_only=True, monitor='val_loss',
      mode='min', save_weights_only=False, verbose=1)
resnet50_model = transfer_learning(resnet50)

In [None]:
resnet50_history = resnet50_model.fit(
    tf_dataset_train_augmented,
    epochs=20,
    validation_data=tf_dataset_val,
    batch_size=8,
    callbacks=[early_stopping, model_checkpoint]
)

In [None]:
plot(resnet50_history)

In [None]:
# Get predictions and true classes for train, validation, and test datasets
y_true_train, y_pred_train = get_predictions(tf_dataset_train)
y_true_val, y_pred_val = get_predictions(tf_dataset_val)
y_true_test, y_pred_test = get_predictions(tf_dataset_test)

precision_train_rn, recall_train_rn, f1_train_rn, _ = precision_recall_fscore_support(y_true_train, y_pred_train, average='weighted')
precision_val_rn, recall_val_rn, f1_val_rn, _ = precision_recall_fscore_support(y_true_val, y_pred_val, average='weighted')
precision_test_rn, recall_test_rn, f1_test_rn, _ = precision_recall_fscore_support(y_true_test, y_pred_test, average='weighted')

data = {
    'Precision' : [precision_train_rn, precision_val_rn, precision_test_rn],
    'Recall' : [recall_train_rn, recall_val_rn, recall_test_rn],
    'F1-Score' : [f1_train_rn, f1_val_rn, f1_test_rn]
}

df_rn = pd.DataFrame(data, index=['Train', 'Validation', 'Test'])
df_rn

## VGG16

In [None]:
vgg16 = VGG16(input_shape=(299, 299, 3), include_top=False, weights='imagenet')
model_checkpoint = ModelCheckpoint(filepath='vgg16.h5'.format(model_name),save_best_only=True, monitor='val_loss',
      mode='min', save_weights_only=False, verbose=1)
vgg16_model = transfer_learning(vgg16)

In [None]:
vgg16_history = vgg16_model.fit(
    augmented_dataset_train,
    epochs=20,
    validation_data=tf_dataset_val,
    batch_size=8,
    callbacks=[early_stopping, model_checkpoint]
)

In [None]:
plot_history(vgg16_history)

In [None]:
# Get predictions and true classes for train, validation, and test datasets
y_true_train, y_pred_train = get_predictions(tf_dataset_train)
y_true_val, y_pred_val = get_predictions(tf_dataset_val)
y_true_test, y_pred_test = get_predictions(tf_dataset_test)

precision_train_vgg, recall_train_vgg, f1_train_vgg, _ = precision_recall_fscore_support(y_true_train, y_pred_train, average='weighted')
precision_val_vgg, recall_val_vgg, f1_val_vgg, _ = precision_recall_fscore_support(y_true_val, y_pred_val, average='weighted')
precision_test_vgg, recall_test_vgg, f1_test_vgg, _ = precision_recall_fscore_support(y_true_test, y_pred_test, average='weighted')

data = {
    'Precision' : [precision_train_vgg, precision_val_vgg, precision_test_vgg],
    'Recall' : [recall_train_vgg, recall_val_vgg, recall_test_vgg],
    'F1-Score' : [f1_train_vgg, f1_val_vgg, f1_test_vgg]
}

df_vgg = pd.DataFrame(data, index=['Train', 'Validation', 'Test'])
df_vgg

## Compare

In [None]:
data = {
    'Model Name' : ['CNN + MLP', 'EfficientNetB0', 'ResNet50', 'VGG16'],
    'Precision Train' : [precision_train_cnn, precision_train_en, precision_val_rn, precision_vgg],
    'Precision Val' : [precision_val_cnn, precision_val_en, precision_val_rn, precision_val_vgg],
    'Precision Test': [precision_test_cnn, precision_test_en, precision_test_rn, precision_test_vgg],
    'Recall Train' : [recall_train_cnn, recall_train_en, recall_train_rn, recall_train_vgg],
    'Recall Val' : [recall_val_cnn, recall_val_en, recall_val_rn, recall_val_vgg],
    'Recall Test' : [recall_test_cnn, recall_test_en, recall_test_rn, recall_test_vgg],
    'F1-Score Train' : [f1_train_cnn, f1_train_en, f1_train_rn, f1_train_vgg],
    'F1-Score Val' : [f1_val_cnn, f1_val_en, f1_val_rn, f1_val_vgg],
    'F1-Score Test' : [f1_test_cnn, f1_test_en, f1_test_rn, f1_test_vgg]
}

compare_df = pd.DataFrame(data)
compare_df