In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
!code .

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, accuracy_score
from tensorflow.keras.models import load_model
import matplotlib.pyplot as plt
import numpy as np

## Load dataset

In [None]:
from DatasetHandler import DatasetHandler
training_handler = DatasetHandler('dataset/training')
validation_handler = DatasetHandler('dataset/validation')

In [None]:
print('Dataset classes')
print('\t', training_handler.classes)

print('Dataset dimension')
print('\t', len(training_handler.s2_paths), 'training samples belonging to ', len(training_handler.classes), 'classes')
print('\t', len(validation_handler.s2_paths), 'validation samples belonging to ', len(validation_handler.classes), 'classes')

print('Dataset example')
idx = 0
print('\t Sentinel-2 image path', training_handler.s2_paths[idx], '\n \t Sentinel-2 image label', training_handler.s2_labels[idx])#, '\n \t Sentinel-2 image shape', training_handler.s2_shape) 
print('\n \t Sentinel-1 image path', training_handler.s1_paths[idx], '\n \t Sentinel-1 image label', training_handler.s1_labels[idx])#, '\n \t Sentinel-1 image shape', training_handler.s1_shape) 

In [None]:
classes = []
for c in training_handler.classes:
    classes.append(c.split('/')[-1])

## Sentinel-2 Classifier

In [None]:
s2classifier = None
from CNN_Classifier import CNN_Classifier
s2classifier = CNN_Classifier((64,64, 12), 5)

s2classifier.model = load_model('weights/S2-classifier.h5')

In [None]:
batch_size = 16
training_loader = training_handler.s2_data_loader(batch_size, (64,64,12))
validation_loader = validation_handler.s2_data_loader(batch_size, (64,64,12))

training_steps = 4*len(training_handler.s2_paths)
validation_steps = 4*len(validation_handler.s2_paths)

epochs = 5

s2classifier.train_model(epochs, batch_size, training_loader, validation_loader, training_steps, validation_steps)

In [None]:
s2classifier.model.save('weights/S2-classifier.h5')

## Sentinel-1 classifier

In [None]:
from DatasetHandler import DatasetHandler
training_handler = DatasetHandler('dataset/training')
validation_handler = DatasetHandler('dataset/validation')

In [None]:
s1classifier = None
from CNN_Classifier import CNN_Classifier
s1classifier = CNN_Classifier((64,64, 2), 5)

In [None]:
s1classifier.model = load_model('weights/S1-classifier.h5')

In [None]:
batch_size = 16
training_loader = training_handler.s1_data_loader(batch_size, (64,64,2))
validation_loader = validation_handler.s1_data_loader(batch_size, (64,64,2))

training_steps = 4*len(training_handler.s1_paths)
validation_steps = 4*len(validation_handler.s1_paths)

epochs = 5

s1classifier.train_model(epochs, batch_size, training_loader, validation_loader, training_steps, validation_steps)

In [None]:
s1classifier.model.save('weights/S1-classifier.h5')

# Early Fusion

In [None]:
earlyclassifier = None
from CNN_Classifier import CNN_Classifier
earlyclassifier = CNN_Classifier((64,64, 12+2), 5)

earlyclassifier.model = load_model('weights/S2-S1-early-classifier.h5')

In [None]:
batch_size = 16
training_loader = training_handler.s2_s1_data_loader_2(batch_size, (64,64,12), (64,64,2))
validation_loader = validation_handler.s2_s1_data_loader_2(batch_size, (64,64,12), (64,64,2))

training_steps = 4*len(training_handler.s2_paths)
validation_steps = 4*len(validation_handler.s2_paths)

epochs = 5

earlyclassifier.train_model(epochs, batch_size, training_loader, validation_loader, training_steps, validation_steps)

In [None]:
earlyclassifier.model.save('weights/S2-S1-early-classifier.h5')

# Joint Fusion

In [None]:
from DatasetHandler import DatasetHandler
training_handler = DatasetHandler('dataset/training')
validation_handler = DatasetHandler('dataset/validation')

In [None]:
fclassifier = None
from Fusion_Classifier import Fusion_Classifier
fclassifier = Fusion_Classifier((64,64, 12), (64,64,2), 5)

fclassifier.model = load_model('weights/S2-S1-classifier.h5')

In [None]:
batch_size = 16
training_loader = training_handler.s2_s1_data_loader(batch_size, (64,64,12), (64,64,2))
validation_loader = validation_handler.s2_s1_data_loader(batch_size, (64,64,12), (64,64,2))

training_steps = 4*len(training_handler.s1_paths)
validation_steps = 4*len(validation_handler.s1_paths)

epochs = 5

fclassifier.train_model(epochs, batch_size, training_loader, validation_loader, training_steps, validation_steps)

In [None]:
fclassifier.model.save('weights/S2-S1-classifier.h5')

# Comparisons

In [None]:
from sklearn.metrics import precision_recall_fscore_support, accuracy_score, classification_report

In [None]:
from DatasetHandler import DatasetHandler
training_handler = DatasetHandler('dataset/training')
validation_handler = DatasetHandler('dataset/validation')

In [None]:
classes = []
for c in training_handler.classes:
    classes.append(c.split('/')[-1])

In [None]:
from tensorflow.keras.models import load_model
s2classifier = CNN_Classifier((64,64, 12), 5)
s1classifier = CNN_Classifier((64,64, 2), 5)
fclassifier = Fusion_Classifier((64,64, 12), (64,64,2), 5)
earlyclassifier = CNN_Classifier((64,64, 12+2), 5)

s2classifier.model = load_model('weights/S2-classifier.h5')
s1classifier.model = load_model('weights/S1-classifier.h5')
fclassifier.model = load_model('weights/S2-S1-classifier.h5')
earlyclassifier.model = load_model('weights/S2-S1-early-classifier.h5')

In [None]:
validation_loader = validation_handler.s2_s1_data_loader(10*len(validation_handler.s1_paths), (64,64,12), (64,64,2))
s2_s1, g_truth = next(iter(validation_loader))

In [None]:
s2_pre = s2classifier.model.predict(s2_s1[0])
s1_pre = s1classifier.model.predict(s2_s1[1])
f_pre = fclassifier.model.predict(s2_s1)

s2s1 = np.zeros((10*len(validation_handler.s1_paths), 64,64, 14))
s2s1[...,0:2] = s2_s1[1]
s2s1[...,2:] = s2_s1[0]

e_pre = earlyclassifier.model.predict(s2s1)

In [None]:
late_sum = []
for i in range(s2_pre.shape[0]):
        late_sum.append(np.argmax((s1_pre[i]+s2_pre[i])))

In [None]:
late_weight = []
w1 = np.array([0, 1, 1, 1, 0])
w2 = 1 - w1

for i in range(s2_pre.shape[0]):
        late_weight.append(np.argmax((w1*s1_pre[i]+w2*s2_pre[i])))

In [None]:
import matplotlib
font = {'family' : 'normal',
        'weight' : 'bold',
        'size'   : 20}

matplotlib.rc('font', **font)

In [None]:
fig, axes = plt.subplots(nrows = 1, ncols = 1, figsize = (12,10))
maxs = []

# S2
ground_truth = np.argmax(g_truth, axis = 1)
prediction = np.argmax(s2_pre, axis = 1)
cm = confusion_matrix(ground_truth, prediction, normalize='true')
cmd = ConfusionMatrixDisplay(cm, display_labels=classes)
cmd.plot(ax=axes, cmap='Blues')
print('S2')
print('Accuracy:', cm.diagonal(), 'mean: ', cm.diagonal().mean())
print(classification_report(ground_truth, prediction, target_names=classes, digits=4))
axes.get_images()[0].set_clim(0, 1)
plt.show()
plt.close()


# S1
fig, axes = plt.subplots(nrows = 1, ncols = 1, figsize = (12,10))
ground_truth = np.argmax(g_truth, axis = 1)
prediction = np.argmax(s1_pre, axis = 1)
cm = confusion_matrix(ground_truth, prediction, normalize='true')
cmd = ConfusionMatrixDisplay(cm, display_labels=classes)
cmd.plot(ax=axes, cmap='Blues')
print('S1')
print('Accuracy:', cm.diagonal(), 'mean: ', cm.diagonal().mean())
print(classification_report(ground_truth, prediction, target_names=classes, digits=4))
axes.get_images()[0].set_clim(0, 1)
plt.show()
plt.close()

# Joint Fusion
fig, axes = plt.subplots(nrows = 1, ncols = 1, figsize = (12,10))
ground_truth = np.argmax(g_truth, axis = 1)
prediction = np.argmax(f_pre, axis = 1)
cm = confusion_matrix(ground_truth, prediction, normalize='true')
cmd = ConfusionMatrixDisplay(cm, display_labels=classes)
cmd.plot(ax=axes, cmap='Blues')
print('Joint Fusion')
print('Accuracy:', cm.diagonal(), 'mean: ', cm.diagonal().mean())
print(classification_report(ground_truth, prediction, target_names=classes, digits=4))
axes.get_images()[0].set_clim(0, 1)
plt.show()
plt.close()

# Late fusion sum
fig, axes = plt.subplots(nrows = 1, ncols = 1, figsize = (12,10))
cm = confusion_matrix(ground_truth, late_sum, normalize='true')
cmd = ConfusionMatrixDisplay(cm, display_labels=classes)
cmd.plot(ax=axes, cmap='Blues')
print('Late Fusion')
print('Accuracy:', cm.diagonal(), 'mean: ', cm.diagonal().mean())
print(classification_report(ground_truth, late_sum, target_names=classes, digits=4))
axes.get_images()[0].set_clim(0, 1)
plt.show()
plt.close()

# Late fusion weight
fig, axes = plt.subplots(nrows = 1, ncols = 1, figsize = (12,10))
cm = confusion_matrix(ground_truth, late_weight, normalize='true')
cmd = ConfusionMatrixDisplay(cm, display_labels=classes)
cmd.plot(ax=axes, cmap='Blues')
print('Late Fusion')
print('Accuracy:', cm.diagonal(), 'mean: ', cm.diagonal().mean())
print(classification_report(ground_truth, late_weight, target_names=classes, digits=4))
axes.get_images()[0].set_clim(0, 1)
plt.show()
plt.close()

# Early Fusion
fig, axes = plt.subplots(nrows = 1, ncols = 1, figsize = (12,10))
ground_truth = np.argmax(g_truth, axis = 1)
prediction = np.argmax(e_pre, axis = 1)
cm = confusion_matrix(ground_truth, prediction, normalize='true')
cmd = ConfusionMatrixDisplay(cm, display_labels=classes)
cmd.plot(ax=axes, cmap='Blues')
print('Early Fusion')
print('Accuracy:', cm.diagonal(), 'mean: ', cm.diagonal().mean())
print(classification_report(ground_truth, prediction, target_names=classes, digits=4))
axes.get_images()[0].set_clim(0, 1)
plt.show()
plt.close()