In [None]:
!pip install -U albumentations

In [None]:
import tensorflow as tf
import keras

print("GPU Available: ", tf.test.is_gpu_available())
tf.__version__, keras.__version__

In [None]:
import os
import random
import matplotlib.pyplot as plt
import numpy as np
from keras import layers, saving
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from tensorflow import data as tf_data
import albumentations as A
from sklearn.utils.class_weight import compute_class_weight


data_path = "/kaggle/input/filtered-clouds/CCSN_filtered_224/"
test_path = data_path+"test"
name_suffix = ""


In [None]:
keras.utils.set_random_seed(43)

In [None]:
BATCH_SIZE = 32
IMAGE_SIZE = (224, 224)
IMAGE_SIZE_299 = (299, 299)

train_dataset, val_dataset = keras.utils.image_dataset_from_directory(
    data_path+"train",
    label_mode="categorical",
    batch_size=BATCH_SIZE,
    validation_split=0.1,
    subset='both',
    seed=42,
    image_size=IMAGE_SIZE,
)

train_dataset299, val_dataset299 = keras.utils.image_dataset_from_directory(
    data_path+"train",
    label_mode="categorical",
    batch_size=BATCH_SIZE,
    validation_split=0.1,
    subset='both',
    seed=42,
    image_size=IMAGE_SIZE_299,
)

In [None]:
test_dataset = keras.utils.image_dataset_from_directory(
    test_path,
    label_mode="categorical",
    batch_size=BATCH_SIZE,
    image_size=IMAGE_SIZE,
)

test_dataset299 = keras.utils.image_dataset_from_directory(
    data_path+"test",
    label_mode="categorical",
    batch_size=BATCH_SIZE,
    image_size=IMAGE_SIZE_299,
)

In [None]:
labels_text = sorted(os.listdir(test_path))
labels_dict = {i: name for i, name in zip(range(len(labels_text)), labels_text)}
labels_text_to_num = {name: i for i, name in zip(range(len(labels_text)), labels_text)}
labels_dict

In [None]:
def show_imgs_ds(ds):
    fig, (ax1, ax2) = plt.subplots(2, 8)
    axes = *ax1, *ax2
    fig.set_size_inches(20, 5)
    for i, (imgs, labels) in enumerate(ds):
        if i > 0:
            break
        
        for ax, img, label in zip(axes, imgs, labels):
            ax.set_title(labels_dict.get(int(np.argmax(label, axis=-1)), "Error"))
            ax.axis("off")
            img = np.array(img)
            if np.max(img) > 1:
                ax.imshow(img.astype(np.uint8))
            elif np.min(img) < 0:
                ax.imshow(((img+1)*127.5).astype(np.uint8))
            else:
                ax.imshow((img*255).astype(np.uint8))

sample = train_dataset.take(1)
show_imgs_ds(sample)

In [None]:
@saving.register_keras_serializable("Custom")
class PreprocessingLayerResnet(layers.Layer):
    def __init__(self, **kwargs):
        super(PreprocessingLayerResnet, self).__init__(**kwargs)

    def get_config(self):
        config = super(PreprocessingLayerResnet, self).get_config()
        return config

    def call(self, inputs, training=None):
        outputs = keras.applications.resnet.preprocess_input(inputs)
        outputs.set_shape(inputs.shape)
        return outputs

@saving.register_keras_serializable("Custom")
class PreprocessingLayerXception(layers.Layer):
    def __init__(self, **kwargs):
        super(PreprocessingLayerXception, self).__init__(**kwargs)

    def get_config(self):
        config = super(PreprocessingLayerXception, self).get_config()
        return config

    def call(self, inputs, training=None):
        outputs = keras.applications.xception.preprocess_input(inputs)
        outputs.set_shape(inputs.shape)
        return outputs

@saving.register_keras_serializable("Custom")
class AugmentationLayer(layers.Layer):
    def __init__(self, transforms=None, **kwargs):
        super(AugmentationLayer, self).__init__(**kwargs)
        if transforms is None:
            self.transforms = A.Compose([])
        else:
            self.transforms = transforms

    def get_config(self):
        config = super(AugmentationLayer, self).get_config()
        return config

    @tf.function
    def augment_image(self, image):
        def _augment(image_np):
            augmented = self.transforms(image=image_np/255)
            return augmented["image"].astype(np.float32)*255

        image_aug = tf.numpy_function(func=_augment, inp=[image], Tout=tf.float32)
        return image_aug

    def call(self, inputs, training=None):
        if training:
            outputs = tf.map_fn(self.augment_image, inputs, dtype=tf.float32)
            outputs.set_shape(inputs.shape)
            return outputs

        return inputs

aug = A.Compose([
    A.Perspective(p=0.5, scale=(0, 0.25)),
    A.GridDistortion(p=0.3),
    A.ElasticTransform(p=0.3),
    A.UnsharpMask(p=0.3),
    A.GaussianBlur(p=0.15, blur_limit=[3,5]),
    A.ColorJitter(p=1.0, brightness=[0.7,1.2], contrast=[0.8,1.3], saturation=[0.8,1.2], hue=[-0.06,0.06]),
    A.RandomGamma(p=0.2),
    A.RandomShadow(p=0.25, shadow_roi=[0, 0.6, 1, 1], num_shadows_limit=[1,1], shadow_dimension=4, shadow_intensity_range=[1.0,1.0]),
    A.GaussNoise(p=0.15, var_limit=[5e-4,1e-3], per_channel=False),
])

augmentation_layer = AugmentationLayer(transforms=aug)
full_augmentation_layer = keras.models.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.1),
    layers.RandomZoom((-0.25, 0.15)),
    augmentation_layer,
], name="data_augmentation")

augmentation_layer299 = AugmentationLayer(transforms=aug)
full_augmentation_layer299 = keras.models.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.1),
    layers.RandomZoom((-0.25, 0.15)),
    augmentation_layer299,
], name="data_augmentation299")

min_augmentation_layer = keras.models.Sequential([
    layers.RandomFlip("horizontal"),
    layers.RandomRotation(0.1),
    layers.RandomZoom((-0.25, 0.15)),
], name="data_augmentation")

In [None]:
def augment(augmentation_layer):
    return lambda image, label: (augmentation_layer(image, training=True), label)


sample = train_dataset.take(1)
aug_sample = sample.map(augment(full_augmentation_layer))
show_imgs_ds(aug_sample)

sample = train_dataset.take(1)
aug_sample = sample.map(augment(full_augmentation_layer))
show_imgs_ds(aug_sample)

In [None]:
for image, _ in sample:
    plt.figure(figsize=(10, 10))
    first_image = image[random.randint(0, BATCH_SIZE)]
    for i in range(9):
        ax = plt.subplot(3, 3, i + 1)
        augmented_image = full_augmentation_layer(tf.expand_dims(first_image, 0), training=True)
        plt.imshow(augmented_image[0] / 255)
        plt.axis('off')

In [None]:
def plot_loss_acc(history, out_path_loss, out_path_acc):
    epochs = range(len(history.history["loss"]))
    plt.figure()
    plt.title("Loss history")
    plt.plot(epochs, history.history["loss"], label="Training loss")
    plt.plot(epochs, history.history["val_loss"], label="Validation loss")
    plt.legend()
    plt.grid()
    plt.savefig(out_path_loss)
    plt.show()
    
    plt.figure()
    plt.title("Accuracy history")
    plt.plot(epochs, history.history["acc"], label="Training accuracy")
    plt.plot(epochs, history.history["val_acc"], label="Validation accuracy")
    plt.legend()
    plt.grid()
    plt.savefig(out_path_acc)
    plt.show()

def print_layers(conv_base, freeze_point):
    print("=============================")
    for layer in conv_base.layers[:freeze_point]:
        print(layer.name)

    print("-----------------------------")
    for layer in conv_base.layers[freeze_point:]:
        print(layer.name)

    return conv_base

In [None]:
def get_y(ds):
    y = []
    for _, labels in ds:
        for label in labels:
            y.append(np.argmax(label, axis=-1))
    return y

y_train = get_y(train_dataset)
class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
class_weights_dict = {i: class_weights[i] for i in range(len(class_weights))}
print(class_weights_dict)

In [None]:
IMG_SHAPE = IMAGE_SIZE + (3,)
IMG_SHAPE_299 = IMAGE_SIZE_299 + (3,)

def create_model_cn_tiny(dropout, augmentation_layer):
    lr_schedule = keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=0.00005,
        decay_steps=5000,
        decay_rate=0.2)
    optimizer = keras.optimizers.Adam(learning_rate=lr_schedule)
    
    input_tensor = keras.Input(shape=IMG_SHAPE)
    x = augmentation_layer(input_tensor)
    
    conv_base = keras.applications.convnext.ConvNeXtTiny(weights='imagenet', include_top=False, input_shape=IMG_SHAPE)
    conv_base.name = "base_model"
    conv_base.trainable = True
    for layer in conv_base.layers[:len(conv_base.layers) // 3 + 3]:
        layer.trainable =  False

    x = conv_base(x, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Dense(128, activation='gelu', kernel_regularizer=keras.regularizers.l2(0.006))(x)
    x = layers.Dropout(dropout)(x)
    predictions = layers.Dense(11, activation='softmax')(x)
    model = keras.models.Model(inputs=input_tensor, outputs=predictions)
    model.compile(loss=keras.losses.CategoricalCrossentropy(), optimizer=optimizer, metrics=['acc'])
    return model

def create_model_xception(dropout, augmentation_layer):
    lr_schedule = keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=0.0001,
        decay_steps=1000,
        decay_rate=0.1)
    optimizer = keras.optimizers.Adam(learning_rate=lr_schedule)
    
    input_tensor = keras.Input(shape=IMG_SHAPE_299)
    x = augmentation_layer(input_tensor)
    x = PreprocessingLayerXception()(x)
    
    conv_base = keras.applications.Xception(weights='imagenet', include_top=False, input_shape=IMG_SHAPE_299)
    conv_base.trainable = True
    for layer in conv_base.layers[:len(conv_base.layers) // 3 + 1]:
        layer.trainable =  False

    x = conv_base(x, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Dense(128, activation='elu', kernel_regularizer=keras.regularizers.L2(0.008))(x)
    x = layers.Dropout(dropout)(x)
    predictions = layers.Dense(11, activation='softmax')(x)
    model = keras.models.Model(inputs=input_tensor, outputs=predictions)
    model.compile(loss=keras.losses.CategoricalCrossentropy(), optimizer=optimizer, metrics=['acc'])
    return model

def create_model_resnet(dropout, augmentation_layer):
    lr_schedule = keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=0.0001,
        decay_steps=10000,
        decay_rate=0.1)
    optimizer = keras.optimizers.Nadam(learning_rate=lr_schedule)
    
    input_tensor = keras.Input(shape=IMG_SHAPE)
    x = augmentation_layer(input_tensor)
    x = PreprocessingLayerResnet()(x)
    
    conv_base = keras.applications.ResNet50V2(weights='imagenet', include_top=False, input_shape=IMG_SHAPE)
    conv_base.trainable = True
    for layer in conv_base.layers[:len(conv_base.layers) // 4]:
        layer.trainable =  False
    conv_base.name = "base_model"
    keras.utils.plot_model(conv_base, to_file='blocks_resnet.png')

    x = conv_base(x, training=False)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dropout(dropout)(x)
    predictions = layers.Dense(11, activation='softmax')(x)
    model = keras.models.Model(inputs=input_tensor, outputs=predictions)
    model.compile(loss=keras.losses.CategoricalCrossentropy(), optimizer=optimizer, metrics=['acc'])
    return model


model_cn = create_model_cn_tiny(0.2, full_augmentation_layer)
model_xc = create_model_xception(0.6, full_augmentation_layer299)
model_rn = create_model_resnet(0.4, full_augmentation_layer)

In [None]:
def train(model, train_dataset, val_dataset, epochs, patience):
    train_dataset = train_dataset.prefetch(tf_data.AUTOTUNE)
    val_dataset = val_dataset.prefetch(tf_data.AUTOTUNE)
    return model.fit(
        train_dataset,
        epochs=epochs,
        validation_data=val_dataset,
        class_weight=class_weights_dict,
        callbacks=[keras.callbacks.EarlyStopping(monitor='val_loss', patience=patience, restore_best_weights=True)]), model

history, model_cn = train(model_cn, train_dataset, val_dataset, 200, patience=20)
plot_loss_acc(history, "/kaggle/working/loss_"+name_suffix+"_cn_tiny"+".png", "/kaggle/working/acc_"+name_suffix+"_cn_tiny"+".png")

history, model_xc = train(model_xc, train_dataset299, val_dataset299, 200, patience=20)
plot_loss_acc(history, "/kaggle/working/loss_"+name_suffix+"_xception"+".png", "/kaggle/working/acc_"+name_suffix+"_xception"+".png")


history, model_rn = train(model_rn, train_dataset, val_dataset, 200, patience=20)
plot_loss_acc(history, "/kaggle/working/loss_"+name_suffix+"_resnet"+".png", "/kaggle/working/acc_"+name_suffix+"_resnet"+".png")


In [None]:
model_cn.save("/kaggle/working/classifier_cn.keras")
model_xc.save("/kaggle/working/classifier_xc.keras")
model_rn.save("/kaggle/working/classifier_rn.keras")

In [None]:
def show_imgs_list(imgs, labels, wrong_labels):
    fig, (ax1, ax2, ax3, ax4) = plt.subplots(4, 8)
    axes = *ax1, *ax2, *ax3, *ax4
    fig.set_size_inches(20, 10)
    for ax, img, label, wrong_label in zip(axes, imgs, labels, wrong_labels):
        ax.set_title(labels_dict.get(int(np.argmax(label, axis=-1)), "Error") + " vs pred: " + labels_dict.get(int(np.argmax(wrong_label, axis=-1)), "Error"))
        ax.axis("off")
        img = np.array(img)
        ax.imshow(img.astype(np.uint8))

test_imgs = []
test_labels = []
for imgs, labels in test_dataset:
    for img, label in zip(imgs, labels):
        test_imgs.append(img)
        test_labels.append(label)

x_test, y_test = np.array(test_imgs), np.array(test_labels)

test_imgs299 = []
test_labels299 = []
for imgs, labels in test_dataset299:
    for img, label in zip(imgs, labels):
        test_imgs299.append(img)
        test_labels299.append(label)

x_test299, y_test299 = np.array(test_imgs299), np.array(test_labels299)

score1 = model_cn.evaluate(x_test, y_test)
print("Accuracy on test dataset ConvNeXt:", score1[1])
score2 = model_xc.evaluate(x_test299, y_test299)
print("Accuracy on test dataset Xception:", score2[1])
score3 = model_rn.evaluate(x_test, y_test)
print("Accuracy on test dataset ResNet:", score3[1])


In [None]:
predicted_labels = model_cn.predict(x_test)
misclassified_indices = np.argmax(predicted_labels, axis=-1) != np.argmax(y_test, axis=-1)
show_imgs_list(x_test[misclassified_indices], y_test[misclassified_indices], predicted_labels[misclassified_indices])

In [None]:
def show_cm(model, x, y, out_name):
    y_test_pred = model.predict(x)
    y_test_pred_cls = [np.argmax(sample) for sample in y_test_pred]
    cm = confusion_matrix(y, y_test_pred_cls, labels=list(labels_dict.keys()))
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=list(labels_dict.values()))
    disp.plot()
    plt.savefig("/kaggle/working/test_confusion_matrix_"+out_name+"_"+name_suffix+".png")
    plt.show()

show_cm(model_cn, x_test, [np.argmax(label) for label in y_test], "convnext")
show_cm(model_xc, x_test299, [np.argmax(label) for label in y_test299], "xception")
show_cm(model_rn, x_test, [np.argmax(label) for label in y_test], "resnet")

In [None]:
def preprocess_image(image_path):
    img = tf.keras.utils.load_img(image_path, target_size=IMAGE_SIZE)
    img_array = tf.keras.utils.img_to_array(img)
    return img_array

def make_gradcam_heatmap(img_array, model, last_conv_layer_name, pred_index=None):
    grad_model = keras.models.Model(
        model.input, [model.get_layer(last_conv_layer_name).output, model.output]
    )
    with tf.GradientTape() as tape:
        last_conv_layer_output, preds = grad_model(img_array)
        if pred_index is None:
            pred_index = tf.argmax(preds[0])
        class_channel = preds[:, pred_index]

    grads = tape.gradient(class_channel, last_conv_layer_output)
    pooled_grads = tf.reduce_mean(grads, axis=(0, 1, 2))
    last_conv_layer_output = last_conv_layer_output[0]
    heatmap = last_conv_layer_output @ pooled_grads[..., tf.newaxis]
    heatmap = tf.squeeze(heatmap)
    heatmap = tf.maximum(heatmap, 0) / tf.math.reduce_max(heatmap)
    return heatmap.numpy()

def save_gradcam(img, heatmap, cam_path, alpha=0.3):
    heatmap = np.uint8(255 * heatmap)
    jet = plt.colormaps["jet"]
    jet_colors = jet(np.arange(256))[:, :3]
    jet_heatmap = jet_colors[heatmap]
    jet_heatmap = keras.utils.array_to_img(jet_heatmap)
    jet_heatmap = jet_heatmap.resize((img.shape[1], img.shape[0]))
    jet_heatmap = keras.utils.img_to_array(jet_heatmap)
    superimposed_img = jet_heatmap * alpha + img
    superimposed_img = keras.utils.array_to_img(superimposed_img)
    superimposed_img.save(cam_path)
    return jet_colors[heatmap], superimposed_img


image_paths = [
    test_path+"/Ac/" + os.listdir(test_path+"/Ac/")[0],
    test_path+"/As/" + os.listdir(test_path+"/As/")[0],
    test_path+"/Cb/" + os.listdir(test_path+"/Cb/")[0],
    test_path+"/Cc/" + os.listdir(test_path+"/Cc/")[0],
    test_path+"/Ci/" + os.listdir(test_path+"/Ci/")[0],
    test_path+"/Cs/" + os.listdir(test_path+"/Cs/")[0],
    test_path+"/Ct/" + os.listdir(test_path+"/Ct/")[0],
    test_path+"/Cu/" + os.listdir(test_path+"/Cu/")[0],
    test_path+"/Ns/" + os.listdir(test_path+"/Ns/")[0],
    test_path+"/Sc/" + os.listdir(test_path+"/Sc/")[0],
    test_path+"/St/" + os.listdir(test_path+"/St/")[0],
]
# imgs = []
# for path in image_paths:
#     imgs.append(preprocess_image(path))

# img_array = np.array(imgs)
# last_conv_layer_name = "convnext_tiny_stage_3_block_2_identity"
# model_cn.layers[-1].activation = None

# preds = model_cn.predict(img_array)
# for i, pred in enumerate(preds):
#     print("Predicted:", labels_dict.get(np.argmax(pred)))
#     gradcam_heatmap = make_gradcam_heatmap(np.array([img_array[i]]), model_cn, last_conv_layer_name)
#     heatmap, superimposed_img = save_gradcam(img_array[i], gradcam_heatmap, "/kaggle/working/"+"cam"+str(i)+".jpg")
#     fig, (ax1, ax2, ax3) = plt.subplots(1, 3)
#     fig.set_size_inches(10, 4)
#     ax1.imshow(superimposed_img)
#     ax1.axis('off')
#     ax2.imshow(img_array[i]/255)
#     ax2.axis('off')
#     ax3.imshow(heatmap)
#     ax3.axis('off')

# model_cn.layers[-1].activation = 'softmax'