In [None]:
#Removed imbalanced classes
UNIDENTIFIED_CLASS_TO_REMOVE = 1500

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!nvidia-smi

In [None]:
# Importing necessary libraries and modules for deep learning using PyTorch, data manipulation, and visualization.
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import pandas as pd
from PIL import Image
from tempfile import TemporaryDirectory
from torch.utils.data import Dataset, SubsetRandomSampler
from tqdm import tqdm
from collections import Counter

cudnn.benchmark = True
plt.ion()


In [None]:
tl_class_dict = {
 0: "Face Brick",
 1: "Timber",
 2: "Steel Sheet",
 3: "Plastered and Painted",
 4: "Concrete and Glass",
 5: "Glass",
 6: "Fiber Cement Sheet",
 7: "Concrete Panels",
 8: "Unidentified"
}
tl_class_inv_map = {v:k for k,v in tl_class_dict.items()}

In [None]:
# Implementation of a custom dataset class for processing and managing street view image data.
class StreetViewImageDataset(torch.utils.data.Dataset):
    def __init__(self, root_dir, split_ratio=0.8, transform=None):
        self.root_dir = root_dir
        self.split_ratio = split_ratio
        self.transform = transform
        self.classes = sorted(os.listdir(root_dir))
        self.class_to_idx = tl_class_inv_map
        self.data = self._load_data()
        self._analyze_data()
        self._split_data()

    def _analyze_data(self):
        label_counter = Counter({label: 0 for label in range(0, 9)})

        # Count the occurrences of each label in the data
        for _, label in self.data:
            if label in range(0,9):
                label_counter[label] += 1

        # Display the counts for each label
        for label, count in label_counter.items():
            print(f"{label}: {100*count / self.__len__():.3f}% \t {count} \t\t {tl_class_dict[label]}")

    def _load_data(self):
        other_drop_count = 0
        folders = ['Commercial and business Zone', 'Mixed use zone',
                 'Industrial and utilities Zone',  'Residential Zone', 'Glass (Newly added)']
        data = []
        valid_extensions = ['jpg', 'jpeg', 'png', 'bmp']
        for folder in folders:
            dir = os.path.join(self.root_dir, folder)
            subfolders = os.listdir(dir)
            labels = [f for f in subfolders if f.endswith(".csv")]
            for label in labels:
                subdir = label[:-4]
                if subdir == "Residential less than 3.5 height":
                    label_file = pd.read_csv(os.path.join(dir, label),
                                             usecols = ['OBJECTID *', "Type"])
                    label_file = label_file.rename({"Type": "Typology"}, axis='columns')
                else:
                    label_file = pd.read_csv(os.path.join(dir, label),
                                         usecols = ['OBJECTID *', "Typology"])
                label_file.set_index('OBJECTID *', inplace=True)
                label_file.dropna(inplace=True) # drop images without label
                images = os.listdir(os.path.join(dir, subdir))
                for im in images:
                    try:
                        im_path = os.path.join(dir, subdir, im)
                        im_id = int(im.split('_')[0])
                        if im_id in label_file.index:  # Check if im_id exists in the index
                            label = label_file.loc[im_id, "Typology"]
                            label = tl_class_inv_map.get(label, 8)
                            if label == 8 and (other_drop_count < UNIDENTIFIED_CLASS_TO_REMOVE):
                                other_drop_count += 1
                                continue
                            img_extension = im_path.split('.')[-1].lower()
                            if img_extension in valid_extensions:
                                data.append((im_path, label))
                    except Exception as e:
                        print(e)
        return data

    def _split_data(self):
        """split the dataset train, test set"""
        dataset_size = len(self.data)
        split_idx = int(dataset_size * self.split_ratio)
        indices = list(range(dataset_size))
        # Shuffle the indices before splitting
        torch.manual_seed(42)  # For reproducibility
        shuffled_indices = torch.randperm(dataset_size)

        self.train_indices = shuffled_indices[:split_idx]
        self.test_indices = shuffled_indices[split_idx:]


    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, label = self.data[idx]
        img = Image.open(img_path).convert("RGB")

        if self.transform:
            img = self.transform(img)

        return img, label

    def get_train_sampler(self):
        return SubsetRandomSampler(self.train_indices)

    def get_test_sampler(self):
        return SubsetRandomSampler(self.test_indices)


In [None]:
BATCH_SIZE=16

In [None]:
#Data transformation
data_transforms = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomPerspective(distortion_scale=0.5, p=0.5),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(30),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

tl_dataset = StreetViewImageDataset("/content/drive/MyDrive/StreetViewImages",
                                  split_ratio=0.8, transform=data_transforms)
# print(tl_dataset)
dataloaders = {'train': torch.utils.data.DataLoader(tl_dataset, batch_size=BATCH_SIZE,
                                              num_workers=4,
                                             sampler=tl_dataset.get_train_sampler(),
                                                    prefetch_factor=4),
              'test': torch.utils.data.DataLoader(tl_dataset, batch_size=BATCH_SIZE,
                                              num_workers=4,
                                             sampler=tl_dataset.get_test_sampler(),
                                                 prefetch_factor=4)}

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
tl_dataset.__len__()

In [None]:
# Visualizing a batch of images from a training dataset using PyTorch utilities.
def imshow(inp, title=None):
    """Display image for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated


# Get a batch of training data
inputs, classes = next(iter(dataloaders['train']))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs)

imshow(out)

In [None]:
#Model Training
def train_model(dir, model, criterion, optimizer, scheduler, num_epochs=25, load=False):
    since = time.time()
    best_model_params_path = os.path.join(dir, 'best_model_params.pt')
    if load:
        model.load_state_dict(torch.load(best_model_params_path))
    best_acc = 0.0
    training_data = []
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)
        # Each epoch has a training and test phase
        for phase in ['train', 'test']:
            if phase == 'train':
                model.train()  # Set model to training mode: regularization active
            elif phase == 'test' and epoch % 5 == 0:
                model.eval()   # Set model to evaluate mode: regularization disabled
            else:
                continue
            running_loss = 0.0
            running_corrects = 0
            # Iterate over data.
            for inputs, labels in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)
                # zero the parameter gradients
                optimizer.zero_grad()
                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs) # forward pass
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()
                epoch_loss = running_loss / tl_dataset.get_train_sampler().__len__()
                epoch_acc = running_corrects.double() / tl_dataset.get_train_sampler().__len__()
            if phase == 'test':
                epoch_loss = running_loss / tl_dataset.get_test_sampler().__len__()
                epoch_acc = running_corrects.double() / tl_dataset.get_test_sampler().__len__()
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            training_data.append((epoch, phase, epoch_loss, epoch_acc))
            # deep copy the model
            if phase == 'test' and epoch_acc > best_acc:
                best_acc = epoch_acc
                torch.save(model.state_dict(), best_model_params_path)
        print()
    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best test Acc: {best_acc:4f}')

    # load best model weights
    model.load_state_dict(torch.load(best_model_params_path))
    return model, training_data

In [None]:
def visualize_model(model, num_images=6):
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['val']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images//2, 2, images_so_far)
                ax.axis('off')
                ax.set_title(f'predicted: {tl_class_dict[preds[j]]}')
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)

In [None]:
# Configuring the ResNet-50 model for classification with custom output layer, loss function, optimizer, and learning rate scheduler.
model_ft = models.resnet50(weights='IMAGENET1K_V1') #Other ResNET and DenseNET family architecture also tested
num_ftrs = model_ft.fc.in_features

# Here the size of each output sample is set to 2.
# Alternatively, it can be generalized to ``nn.Linear(num_ftrs, len(class_names))``.
model_ft.fc = nn.Linear(num_ftrs, len(tl_class_dict))

model_ft = model_ft.to(device)

# Given class frequencies
class_counts = torch.tensor([3250, 1523, 638, 2748, 182, 173, 146, 396, 1653])

# Total number of samples
total_samples = class_counts.sum().item()

# Calculate weights: Number of samples divided by (number of classes * class counts)
weights = total_samples / (len(class_counts) * class_counts)

# Normalize weights to make the smallest weight 1 for better stability in training
weights = weights / weights.min()

# Example weights (you should calculate these based on your specific class distribution)
class_weights = weights.to(device)

# Initialize the weighted CrossEntropyLoss
criterion = nn.CrossEntropyLoss(weight=class_weights)

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
model_dir = "/content/drive/MyDrive/StreetViewImages/10jul"
os.makedirs(model_dir, exist_ok=True)

In [None]:
model_ft, training_data = train_model(model_dir, model_ft, criterion, optimizer_ft, exp_lr_scheduler,
                                      num_epochs=50, load=False)

In [None]:
# Plotting training and testing loss and accuracy over epochs to visualize model performance.
epoch, train_acc, test_acc, train_loss, test_loss = [], [] , [], [], []
test_epoch = []
for e, data in enumerate(training_data):
    ep, phase, epoch_loss, epoch_acc = data
    if phase == 'train':
        train_acc.append(epoch_acc.cpu())
        train_loss.append(epoch_loss)
        epoch.append(ep)
    elif phase == "test":
        test_acc.append(epoch_acc.cpu())
        test_loss.append(epoch_loss)
        test_epoch.append(ep)

fig, ax = plt.subplots(1, 2, figsize=(20,5))
fig.suptitle("Training Curves", fontsize=16)
ax[0].plot(epoch, train_loss, label='Train loss')
ax[0].plot(test_epoch, test_loss, label="Test loss")
ax[0].legend()
ax[0].grid()
ax[1].plot(epoch, train_acc, label='Train accuracy')
ax[1].plot(test_epoch, test_acc, label='Test accuracy')
ax[1].legend()
ax[1].grid()
plt.show()

In [None]:
#Saving model
torch.save(model_ft.state_dict(), model_dir + "/resnet50.pt")

In [None]:
#Confusion Matrix
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sn
from sklearn.metrics import confusion_matrix
from matplotlib.colors import LinearSegmentedColormap
from matplotlib.patches import Rectangle
import matplotlib.font_manager as fm
from pandas import DataFrame
from string import ascii_uppercase
from matplotlib.collections import QuadMesh


def get_new_fig(fn, figsize=[9, 9]):

    fig1 = plt.figure(fn, figsize)
    ax1 = fig1.gca()  # Get Current Axis
    ax1.cla()  # clear existing plot
    return fig1, ax1

def configcell_text_and_colors(array_df, lin, col, oText, facecolors, posi, fz,
                               fmt, show_null_values=0):
    text_add = [];
    text_del = [];
    cell_val = array_df[lin][col]
    sel_col = array_df[-1][col]
    tot_all = array_df[-1][-1]
    per = (float(cell_val) / sel_col) * 100
    curr_column = array_df[:, col]
    ccl = len(curr_column)

    # last line  and/or last column
    if (col == (ccl - 1)) or (lin == (ccl - 1)):
        # tots and percents
        if (cell_val != 0):
            if (col == ccl - 1) and (lin == ccl - 1):
                tot_rig = 0
                for i in range(array_df.shape[0] - 1):
                    tot_rig += array_df[i][i]
                per_ok = (float(tot_rig) / cell_val) * 100
            elif (col == ccl - 1):
                tot_rig = array_df[lin][lin]
                per_ok = (float(tot_rig) / cell_val) * 100
            elif (lin == ccl - 1):
                tot_rig = array_df[col][col]
                per_ok = (float(tot_rig) / cell_val) * 100
            per_err = 100 - per_ok
        else:
            per_ok = per_err = 0

        per_ok_s = ['%.2f%%' % (per_ok), '100%'][per_ok == 100]

        # text to DEL
        text_del.append(oText)

        # text to ADD
        font_prop = fm.FontProperties(weight='bold', size=fz)
        text_kwargs = dict(color='w', ha="center", va="center", gid='sum',
                           fontproperties=font_prop)
        lis_txt = ['%d' % (cell_val), per_ok_s]
        lis_kwa = [text_kwargs]
        dic = text_kwargs.copy();
        dic['color'] = 'g';
        lis_kwa.append(dic);
        dic = text_kwargs.copy();
        dic['color'] = 'r';
        lis_kwa.append(dic);
        lis_pos = [(oText._x, oText._y - 0.3), (oText._x, oText._y),
                   (oText._x, oText._y + 0.3)]
        for i in range(len(lis_txt)):
            newText = dict(x=lis_pos[i][0], y=lis_pos[i][1], text=lis_txt[i],
                           kw=lis_kwa[i])
            text_add.append(newText)

        # set background color for sum cells (last line and last column)
        carr = [0.0, 0.0, 1.0, 1.0]
        if (col == ccl - 1) and (lin == ccl - 1):
            carr = [0.0, 0.0, 1.0, 1.0]
        facecolors[posi] = carr

    else:
        if (per > 0):
            txt = '%s\n%.2f%%' % (cell_val, per)
        else:
            if (show_null_values == 0):
                txt = ''
            elif (show_null_values == 1):
                txt = '0'
            else:
                txt = '0\n0.0%'
        oText.set_text(txt)

        # main diagonal
        if (col == lin):
            # set color of the textin the diagonal to white
            oText.set_color('r')
            # set background color in the diagonal to blue
            facecolors[posi] = [0.0, 0.0, 1.0, 1.0]
        else:
            oText.set_color('r')
            facecolors[posi] = [0.0, 0.0, 1.0, 1.0]

    return text_add, text_del


def insert_totals(df_cm):
    sum_col = []
    for c in df_cm.columns:
        sum_col.append(df_cm[c].sum())
    sum_lin = []
    for item_line in df_cm.iterrows():
        sum_lin.append(item_line[1].sum())
    df_cm['sum_lin'] = sum_lin
    sum_col.append(np.sum(sum_lin))
    df_cm.loc['sum_col'] = sum_col


def pretty_plot_confusion_matrix(df_cm, annot=True, cmap="Blues", fmt='.2f',
                                 fz=11,
                                 lw=0.5, cbar=False, figsize=[8, 8],
                                 show_null_values=0, pred_val_axis='y'):

    if (pred_val_axis in ('col', 'x')):
        xlbl = 'Predicted'
        ylbl = 'Actual'
    else:
        xlbl = 'Actual'
        ylbl = 'Predicted'
        df_cm = df_cm.T

    # create "Total" column
    insert_totals(df_cm)

    # this is for print allways in the same window
    fig, ax1 = get_new_fig('Conf matrix default', figsize)
    fig.patch.set_facecolor('white')
    ax1.set_facecolor('white')

    # thanks for seaborn
    white_mask = np.ones_like(df_cm)
    ax = sn.heatmap(df_cm, annot=annot, annot_kws={"size": fz}, linewidths=lw,
                    ax=ax1,
                    cbar=cbar, cmap=cmap, linecolor='w', fmt=fmt,mask=white_mask)

    # set ticklabels rotation
    ax.set_xticklabels(ax.get_xticklabels(), rotation=45, fontsize=10)
    ax.set_yticklabels(ax.get_yticklabels(), rotation=25, fontsize=10)

    # face colors list
    quadmesh = ax.findobj(QuadMesh)[0]
    facecolors = quadmesh.get_facecolors()

    # iter in text elements
    array_df = np.array(df_cm.to_records(index=False).tolist())
    text_add = []
    text_del = []
    posi = -1  # from left to right, bottom to top.
    for t in ax.collections[0].axes.texts:  # ax.texts:
        pos = np.array(t.get_position()) - [0.5, 0.5]
        lin = int(pos[1])
        col = int(pos[0])
        posi += 1

        # set text
        txt_res = configcell_text_and_colors(array_df, lin, col, t, facecolors,
                                             posi, fz, fmt, show_null_values)

        text_add.extend(txt_res[0])
        text_del.extend(txt_res[1])

    # remove the old ones
    for item in text_del:
        item.remove()
    # append the new ones
    for item in text_add:
        ax.text(item['x'], item['y'], item['text'], **item['kw'])

    # titles and legends
    ax.set_title('Confusion matrix')
    ax.set_xlabel(xlbl)
    ax.set_ylabel(ylbl)
    plt.tight_layout()  # set layout slim
    # plt.show()


def plot_confusion_matrix_from_data(y_test, predictions, columns=None,
                                    annot=True, cmap="Oranges",
                                    fmt='.2f', fz=22, lw=0.5, cbar=False,
                                    figsize=[8, 8], show_null_values=0,
                                    pred_val_axis='lin'):


    # data
    if (not columns):
        # labels axis string:
        columns = ['class %s' % (i) for i in
                   list(ascii_uppercase)[0:len(np.unique(y_test))]]

    confm = confusion_matrix(y_test, predictions, labels=range(len(columns)))
    df_cm = DataFrame(confm, index=columns, columns=columns)
    pretty_plot_confusion_matrix(df_cm, fz=fz, cmap=cmap, figsize=figsize,
                                 show_null_values=show_null_values,
                                 pred_val_axis=pred_val_axis)

def plot_confusion_matrix_with_sums_and_percentage(cm, class_names):
    n_classes = cm.shape[0]

    # Calculate sums
    row_sums = cm.sum(axis=1)
    col_sums = cm.sum(axis=0)
    total_sum = np.sum(cm)
    colors = ["white", "lightBlue"]  # Define the color range
    cmap_name = "custom_light"
    custom_light_cmap = LinearSegmentedColormap.from_list(cmap_name, colors)
    # Expand the confusion matrix to include sums
    cm_expanded = np.append(cm, row_sums[:, None], axis=1)  # Append row sums as new column
    col_sums_expanded = np.append(col_sums, total_sum)  # Include total sum in column sums
    cm_expanded = np.append(cm_expanded, col_sums_expanded[None, :], axis=0)  # Append column sums as new row

    # Adjust class names for the expanded matrix
    class_names_with_sums = class_names + ["Sum"]

    plt.figure(figsize=(12, 12))
    sn.set(font_scale=1.2)

    # Create a mask for non-diagonal cells to keep them white
    mask = np.ones_like(cm_expanded, dtype=bool)
    np.fill_diagonal(mask, False)
    mask[-1, :] = True  # Keep last row white
    mask[:, -1] = True  # Keep last column white

    ax = sn.heatmap(cm_expanded, annot=True, fmt='d', square=True, cmap=custom_light_cmap,
                    xticklabels=class_names_with_sums, yticklabels=class_names_with_sums,
                    cbar=False, linewidths=.5, linecolor='white')
    ax.set_facecolor('white')
    ax.set_xlabel('Predicted')
    ax.set_ylabel('Actual')
    ax.set_title('Confusion Matrix with Sums and Percentage')

    # Annotate percentages for the diagonal and sums
    for i in range(n_classes + 1):
        for j in range(n_classes + 1):
            percentage_text = ""
            if i < n_classes and j < n_classes and i == j:  # Diagonal cells
                value_percentage = cm[i, j] / total_sum
                color_opacity = np.clip(value_percentage, 0.1, 1)  # Ensure there's a minimum visibility
                color_with_opacity = (0, 0, 1, color_opacity)  # RGB for blue with dynamic opacity

                ax.add_patch(Rectangle((i, i), 1, 1, fill=True, color=color_with_opacity))

                percentage = cm[i, j] / row_sums[i] if row_sums[i] > 0 else 0
                # percentage_text = "{:.1%}".format(percentage)
            elif i == n_classes and j < n_classes:  # Last row, excluding bottom right corner
                percentage = (cm[j, j]) / col_sums[j] if col_sums[j] > 0 else 0
                percentage_text = "{:.1%}".format(percentage)
            elif j == n_classes and i < n_classes:  # Last column, excluding bottom right corner
                percentage = (cm[i, i] ) / row_sums[i] if row_sums[i] > 0 else 0
                percentage_text = "{:.1%}".format(percentage)
            elif i== n_classes and j== n_classes:
                percentage_text = "No of\nTest Images"


            if percentage_text:  # If percentage text is not empty, annotate the cell
                ax.text(j + 0.5, i + 0.5, '\n' + percentage_text, ha="center", va="top", color="black", fontsize="x-small")

    plt.tight_layout()
    plt.show()

cm = confusion_matrix(v_labels, v_preds)
# Define class names
columns = [
    "Face Brick",
    "Timber",
    "Steel Sheet",
    "Plastered and Painted",
    "Concrete and Glass",
    "Glass",
    "Fiber Cement Sheet",
    "Concrete Panels",
    "Unidentified"
]


# Plot the confusion matrix
plot_confusion_matrix_with_sums_and_percentage(cm, class_names=columns)


In [None]:
# Performance Matrix
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score

f1 = f1_score(v_labels, v_preds, average='macro')
acc = accuracy_score(v_labels, v_preds)
prec =  precision_score(v_labels, v_preds, average='macro', zero_division=0.0)
rec = recall_score(v_labels, v_preds, average='macro')

print(f"f1 score: {f1:.3f}")
print(f"accuracy: {acc:.3f}")
print(f"precision: {prec:.3f}")
print(f"recall: {rec:.3f}")