## Import libraries

In [None]:
import numpy as np
import os
import seaborn as sn
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
import cv2
import glob
from tqdm import tqdm
import pandas as pd
from google.colab import drive

Hardware specification

In [None]:
import tensorflow as tf

def check_gpu():
    print("tf.test.is_built_with_cuda()")
    print(tf.test.is_built_with_cuda())

    print()
    print("tf.config.list_physical_devices('GPU')")
    print(tf.config.list_physical_devices('GPU'))

    print()
    print("tf.config.experimental.list_physical_devices('GPU')")
    print(tf.config.experimental.list_physical_devices('GPU'))

check_gpu() # Google Colab doesn't have a GPU?

In [None]:
import tensorflow as tf

# Use CPU for training
#tf.config.set_visible_devices([], 'GPU')

# Use GPU for training (if available)
tf.config.set_visible_devices(tf.config.list_physical_devices('GPU'), 'GPU')

# Check the device placement
print("Device:", tf.config.list_logical_devices())

gpus = tf.config.list_physical_devices('GPU')

if gpus:
  try:
    # Enable memory growth for each GPU
    for gpu in gpus:
      tf.config.experimental.set_memory_growth(gpu, True)
    logical_gpus = tf.config.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
  except RuntimeError as e:
    print(e)


#### Google Colab Google Drive mounting

In [None]:
drive.mount('/content/drive')

# Set the path to the dataset folder
data_folder = '/content/drive/MyDrive/data'
glob.glob('..data/*')
#data_folder = 'data'

files = os.listdir(data_folder)
for file in files:
    print(file)

## Data loading and data preparation

In [None]:
import cv2
import os
import matplotlib.pyplot as plt

def get_image_info(image_path):
    image = cv2.imread(image_path)

    if image is not None:
        # Image shape
        height, width, channels = image.shape
        print("Image shape:", height, "x", width, "x", channels)

        # Image size in bytes
        image_size = os.path.getsize(image_path)
        print("Image size:", image_size, "bytes")

        # Image data type
        image_dtype = image.dtype
        print("Image data type:", image_dtype)

        # Image color space
        if channels == 1:
            color_space = "Grayscale"
        elif channels == 3:
            color_space = "RGB"
        else:
            color_space = "Unknown"
        print("Image color space:", color_space)

        # Display the image
        plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        plt.axis('off')
        plt.show()

    else:
        print("Failed to read the image:", image_path)


get_image_info("/content/drive/MyDrive/data/train/buildings/10006.jpg")


In [None]:
from sklearn.utils import shuffle
from tqdm import tqdm
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import os
import numpy as np
import tensorflow as tf

class_names = ["buildings", "forest", "glacier", "mountain", "sea", "street"]
class_names_label = {class_name: i for i, class_name in enumerate(class_names)}
nb_classes = len(class_names)
train_folders = [os.path.join(data_folder, 'train', class_name) for class_name in class_names]
test_folders = [os.path.join(data_folder, 'test', class_name) for class_name in class_names]

def count_images(folder_path):
    count = 0
    for _, _, files in os.walk(folder_path):
        count += len(files)
    return count

train_counts = sum([count_images(os.path.join(data_folder, 'train', class_name)) for class_name in class_names])
test_counts = sum([count_images(os.path.join(data_folder, 'test', class_name)) for class_name in class_names])

print("Total Train Images:", train_counts)
print("Total Test Images:", test_counts)

def prepare_dataset(path, label):
    x_data = []
    y_data = []
    all_images_path = glob.glob(path + '/*.jpg')
    for img_path in all_images_path:
        img = load_img(img_path, target_size=(150, 150))
        img = img_to_array(img)
        img = img / 255.0
        x_data.append(img)
        y_data.append(label)
    return np.array(x_data), np.array(y_data)

x_train = []
y_train = []
x_test = []
y_test = []
test_labels = []

for folder, label in zip(train_folders, range(nb_classes)):
    trainX, trainY = prepare_dataset(folder, label)
    x_train.extend(trainX)
    y_train.extend(trainY)

for folder, label in zip(test_folders, range(nb_classes)):
    testX, testY = prepare_dataset(folder, label)
    x_test.extend(testX)
    y_test.extend(testY)
    test_labels.extend([label] * len(testY))

x_train = np.array(x_train)
y_train = np.array(y_train)
x_test = np.array(x_test)
y_test = np.array(y_test)
test_labels = np.array(test_labels)

from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

# Convert labels to one-hot encoded vectors
y_train = to_categorical(y_train, num_classes=nb_classes)
y_test = to_categorical(y_test, num_classes=nb_classes)

# Split the training data into training and validation sets
x_train, x_val, y_train, y_val = train_test_split(
    x_train, y_train, test_size=0.2, random_state=42)

y_val = to_categorical(y_val, num_classes=nb_classes)

test_percentage = (test_counts / (train_counts + test_counts)) * 100

print("x_train shape:", x_train.shape)
print("y_train shape:", y_train.shape)
print("x_test shape:", x_test.shape)
print("y_test shape:", y_test.shape)
print("Test Percentage:", test_percentage)

## Load Images and Prepare Dataset

In [None]:
'''
def load_images(self):
        print("Loading " + self.folder_path)
        images = []
        labels = []

        num_images_loaded = 0  # Counter for total images loaded

        # Calculate images per folder based on the total number of folders in the dataset
        images_per_folder = int(self.max_images / len(os.listdir(self.folder_path)))

        for folder in tqdm(os.listdir(self.folder_path)):
            num_images_loaded_this_folder = 0
            if folder not in class_names_label:
                continue
            label = class_names_label[folder]
            # Iterate through each image in this folder
            for file in os.listdir(os.path.join(self.folder_path, folder)):
                if num_images_loaded_this_folder >= images_per_folder:
                    break
                # Get the path name of the image
                img_path = os.path.join(os.path.join(self.folder_path, folder), file)
                # Open the image
                image = imread(img_path)
                image = resize(image, (150, 150, 3))
                # Append the image and its corresponding label to the output
                images.append(image)
                labels.append(label)
                num_images_loaded_this_folder += 1
                num_images_loaded += 1

        print("Loaded " + self.folder_path)
        images, labels = shuffle(images, labels, random_state=3)
        images = np.array(images, dtype='float16') / 255.0
        labels = np.array(labels, dtype='int8')
        labels = tf.keras.utils.to_categorical(labels, num_classes=6)  # Convert labels to one-hot encoding
        return images, labels
'''

### Load images in batches into RAM

In [None]:
from skimage.io import imread
from skimage.transform import resize
from tensorflow.keras.utils import Sequence
from sklearn.utils import shuffle
from tensorflow.keras.utils import to_categorical

class My_Custom_Generator(Sequence):
    def __init__(self, folder_path, max_images, batch_size):
        self.folder_path = folder_path
        self.max_images = max_images
        self.batch_size = batch_size

    def __len__(self):
        return int(np.ceil(len(self.images) / float(self.batch_size)))

    def __getitem__(self, idx):
        batch_images = self.images[idx * self.batch_size: (idx + 1) * self.batch_size]
        batch_labels = self.labels[idx * self.batch_size: (idx + 1) * self.batch_size]
        return batch_images, batch_labels

train_generator = My_Custom_Generator(os.path.join(data_folder, 'train'), max_images=7000, batch_size=64)
test_generator = My_Custom_Generator(os.path.join(data_folder, 'test'), max_images=1750, batch_size=64)

## Multi-class Classification using Convulotinal Neural Networks (CNNs)

We are using a pre-trained ResNet50 model is used for multi-class image classification. The ResNet50 model is loaded without its top layer and its layers are frozen to prevent them from being trained. A new sequential model is created on top of the base model, consisting of a flatten layer, a dense layer with ReLU activation, and a dense layer with softmax activation for the output. The model is compiled with the Adam optimizer and categorical cross-entropy loss. The training and test images are preprocessed by scaling their values between 0 and 1. The model is then trained using the training images and labels, with a batch size of 32 and for 10 epochs. The validation data is provided using the test images and labels. After training, the model is saved for future use.

In [None]:
import tensorflow as tf
from tensorflow.keras.applications import ResNet50, DenseNet121,  VGG16, InceptionV3
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, GlobalAveragePooling2D
from sklearn.utils import shuffle
from tensorflow.keras.models import Model,load_model, Sequential
from tensorflow.keras.layers import  GlobalAveragePooling2D, Dropout, Dense, Flatten
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import  Adam

def train_model( architecture, batch_size, learning_rate=0.001):
    # Load the specified base
    if architecture == 'resnet50':
        print("Resnet-50 CNN Model")
        print("batch_size={0}".format(batch_size))
        model_name = 'scene_classification_resnet.h5'
        # Load the pre-trained ResNet-50 model
        base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(150, 150, 3))
        # Freeze the base model's layers
        #base_model.trainable = False
        for layer in base_model.layers[:-15]:
            layer.trainable = False

        # Create a new model on top of the base model
        model = tf.keras.Sequential([
            base_model,
            GlobalAveragePooling2D(),
            Dropout(0.25),
            Dense(6, activation='softmax')
        ])

        # Compile the model
        optimizer = Adam(learning_rate=learning_rate)
        model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
        model.summary()
    elif architecture == 'densenet121':
        print("Densenet-121 CNN Model")
        print("batch_size={0}".format(batch_size))
        model_name = 'scene_classification_densenet.h5'
        base_model = DenseNet121(weights='imagenet', include_top=False, input_shape=(150, 150, 3))
        # Freeze the base model's layers
        # base_model.trainable = False
        for layer in base_model.layers[:-10]:
            layer.trainable = False

        # Create a new model on top of the base model
        model = Sequential()
        model.add(base_model)
        model.add(GlobalAveragePooling2D())
        model.add(Dense(256, activation='relu'))
        model.add(Dense(6, activation='softmax'))  # Assuming 6 scene classes

        # Compile the model
        model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

        model.summary()
    elif architecture == 'vgg16':
        print("VGG16 CNN Model")
        print("batch_size={0}".format(batch_size))
        model_name = 'scene_classification_vgg16.h5'
        base_model = VGG16(weights='imagenet', include_top=False, input_shape=(150, 150, 3))
        # Freeze the base model's layers
        # base_model.trainable = False
        for layer in base_model.layers[:-6]:
            layer.trainable = False

        # Create a new model on top of the base model
        model = Sequential()
        model.add(base_model)
        model.add(GlobalAveragePooling2D())
        model.add(Dense(256, activation='relu'))
        model.add(Dense(6, activation='softmax'))  # Assuming 6 scene classes

        # Compile the model
        model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

        model.summary()
    elif architecture == 'inceptionv3':
        print("InceptionV3 CNN Model")
        print("batch_size={0}".format(batch_size))
        model_name = 'scene_classification_inceptionv3.h5'
        base_model = InceptionV3(weights='imagenet', include_top=False, input_shape=(150, 150, 3))
        # Freeze the base model's layers
        # base_model.trainable = False
        for layer in base_model.layers[:-4]:
            layer.trainable = False

        # Create a new model on top of the base model
        model = Sequential()
        model.add(base_model)
        model.add(GlobalAveragePooling2D())
        model.add(Dense(256, activation='relu'))
        model.add(Dense(6, activation='softmax'))  # Assuming 6 scene classes

        # Compile the model
        model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

        model.summary()
    else:
        raise ValueError("Invalid architecture specified. Supported architectures: 'resnet50', 'densenet121'.")

    # Calculate the number of steps (batches) per epoch for training and validation
    train_steps = len(x_train)
    val_steps = len(x_test)

    # Train the model
    history = model.fit(
        x_train,
        y_train,
        steps_per_epoch=train_steps,
        epochs=8,
        validation_data=(x_test, y_test),
        validation_steps=val_steps
    )

    # Save the model
    model.save(model_name)

    # Evaluate the model on the test set
    test_loss, test_accuracy = model.evaluate(x_test, steps=val_steps)

    # Print the test accuracy
    print("Test Loss:", test_loss)
    print("Test Accuracy:", test_accuracy)

    return model, history


## Learning Curve and Confusion Matrix

In [None]:
import matplotlib.pyplot as plt
import seaborn as sn
from sklearn.metrics import confusion_matrix

def plot_loss_accuracy(history):
    if not isinstance(history, dict):
        history = history.history

    num_epochs = len(history["accuracy"])
    x_values = range(1, num_epochs + 1)  # Generate x-axis values starting from 1

    # Plot accuracy
    plt.plot(x_values, history['accuracy'], label = "Train acc")
    plt.plot(x_values, history['val_accuracy'], label = "Validation acc")
    plt.title("Learning curve")
    plt.ylabel("Accuracy")
    plt.xlabel("Epochs")
    plt.legend()
    plt.xticks(range(0, num_epochs + 1, 2), range(0, num_epochs + 1, 2))
    plt.show()

    # Plot loss function
    plt.plot(x_values, history['loss'], label = "Train loss")
    plt.plot(x_values, history['val_loss'], label = "Validation loss")
    plt.title("Learning curve")
    plt.ylabel("Loss")
    plt.xlabel("Epochs")
    plt.legend()
    plt.xticks(range(0, num_epochs + 1, 2), range(0, num_epochs + 1, 2))
    plt.show()

def plot_confusion_matrix(model, verbose=1):
    class_names = ["buildings", "forest", "glacier", "mountain", "sea", "street" ]
    preds = model.predict(x_test, workers=8, verbose=verbose)
    preds_labels = np.argmax(preds, axis=1)
    cm = confusion_matrix(test_labels, preds_labels)
    ax = plt.axes()
    sn.heatmap(cm, annot=True, fmt="d",
               annot_kws={"size": 10},
               xticklabels=class_names,
               yticklabels=class_names, ax = ax)
    ax.set_title('Confusion matrix')
    ax.set_xlabel('Predicted')
    ax.set_ylabel('Actual')
    plt.show()

## Load or Save the Model

In [None]:
from tensorflow.keras.models import load_model
import pickle

def save_model_and_history(model, model_file, history, history_file):
    print("Saving " + model_file + " and history")
    model.save(model_file)
    with open(history_file, 'wb') as hist:
        pickle.dump(history.history, hist)
    print("Saved " + model_file + " and history")

def load_model_and_history(model_file, model_history_file):
    if not os.path.exists(model_file) or not os.path.exists(model_history_file):
        return None, None

    print("Loading model " + model_file + " and history")
    model = load_model(model_file)
    with open(model_history_file, "rb") as hist_file:
        history = pickle.load(hist_file)
    print("Loaded model " + model_file + " and history")

    return model, history

def get_test_acc(model, verbose=1):
    preds = model.predict(x_test, workers=8, verbose=verbose)
    preds_labels = np.argmax(preds, axis=1)
    cm = confusion_matrix(test_labels, preds_labels)

    sum_diagonal = np.trace(cm)
    acc = sum_diagonal / np.sum(cm)
    return acc

def get_test_accs(model, verbose=1):
    preds = model.predict(x_test, workers=8, verbose=verbose)
    preds_labels = np.argmax(preds, axis=1)
    cm = confusion_matrix(test_labels, preds_labels)

    accuracies = []
    for i in range(cm.shape[0]):
        tp = cm[i,i]
        fp = np.sum(cm[:, i]) - tp
        tn = np.sum(cm) - np.sum(cm[:, i]) - np.sum(cm[i, :]) + tp
        fn = np.sum(cm[i, :]) - tp
        acc_class = (tp + tn) / (tp+fp+tn+fn)
        accuracies.append(acc_class)

    return accuracies

def evaluate_model(model):
    print("Evaluating on test set...")
    acc = get_test_acc(model, verbose=0)
    print("Test set accuracy: {:.2f}%".format(acc*100))
    accs = get_test_accs(model, verbose=0)
    classes = ["buildings", "forest", "glacier", "mountain", "sea", "street"]
    for i in range(len(classes)):
        print("Test set accuracy for {}: {:.2f}%".format(classes[i], accs[i]*100))


def train_model_load_save_and_analysis(batch_size, model_file, architecture):
    history_file = model_file[0:-3] + "_history"
    model, history = load_model_and_history(model_file, history_file)
    was_model_loaded = model != None and history != None

    if was_model_loaded:
        evaluate_model(model)
    else:
        # Couldn't load model, so train it now and save it
        # model, history = train_model(batch_size=32, architecture='resnet50', learning_rate=0.001)
        model, history = train_model(architecture, batch_size=32, learning_rate=0.001)
        save_model_and_history(model, model_file, history, history_file)

    plot_confusion_matrix(model)
    plot_loss_accuracy(history)
    return model, history

# Create the models folder if doesnt exist
if not os.path.exists("otherarchitectures"):
    os.makedirs("otherarchitectures", exist_ok=True)

batch_sizes = [32, 64, 128]
model_file = "otherarchitectures/resnetmodel_v3_{0}.h5".format(batch_sizes[0])
model, history = train_model_load_save_and_analysis(batch_sizes[0], model_file, architecture='resnet50')
# model, history = train_model_load_save_and_analysis(batch_sizes[0], model_file, architecture='densenet121')
print()


Resnet-50 CNN Model
batch_size=32
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 resnet50 (Functional)       (None, 5, 5, 2048)        23587712  
                                                                 
 global_average_pooling2d (G  (None, 2048)             0         
 lobalAveragePooling2D)                                          
                                                                 
 dropout (Dropout)           (None, 2048)              0         
                                                                 
 dense (Dense)               (None, 6)                 12294     
                                                                 
Total params: 23,600,006
Trainable params: 12,294
Non-trainable params: 23,58

## Code to test if I can get the visual ouput of Feature Extraction

In [None]:
import tensorflow.keras.backend as K
import matplotlib.pyplot as plt

# Create an intermediate model to extract feature maps
feature_extractor = tf.keras.Model(inputs=model.input, outputs=model.layers[0].output)

# Choose an image from your dataset
image = train_generator.images[0]  # Choose an image from the training set

# Preprocess the image
preprocessed_image = train_generator.images[0]  # Preprocess the image if necessary

# Reshape the image to match the input shape of the model
reshaped_image = np.expand_dims(preprocessed_image, axis=0)

# Extract the feature maps using the intermediate model
feature_maps = feature_extractor.predict(reshaped_image)

# Plot the original image
plt.subplot(1, 2, 1)
plt.imshow(image)
plt.title('Original Image')

# Plot the input feature map
plt.subplot(1, 2, 2)
plt.imshow(feature_maps[0, :, :, 0], cmap='gray')  # Assuming grayscale feature map
plt.title('Input Feature Map')

plt.show()
