In [26]:
!pip install rasterio
import uuid
import numpy as np
import warnings

class GenericObject:
    """
    Generic object data.
    """
    def __init__(self):
        self.id = uuid.uuid4()
        self.bb = (-1, -1, -1, -1)
        self.category= -1
        self.score = -1

class GenericImage:
    """
    Generic image data.
    """
    def __init__(self, filename):
        self.filename = filename
        self.tile = np.array([-1, -1, -1, -1])  # (pt_x, pt_y, pt_x+width, pt_y+height)
        self.objects = list([])

    def add_object(self, obj: GenericObject):
        self.objects.append(obj)



In [27]:
categories = {0: 'Cargo plane', 1: 'Helicopter', 2: 'Small car', 3: 'Bus', 4: 'Truck', 5: 'Motorboat', 6: 'Fishing vessel', 7: 'Dump truck', 8: 'Excavator', 9: 'Building', 10: 'Storage tank', 11: 'Shipping container'}

In [40]:
import tensorflow as tf
import numpy as np
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import random
import rasterio
import warnings

def load_geoimage(filename):
    warnings.filterwarnings('ignore', category=rasterio.errors.NotGeoreferencedWarning)
    src_raster = rasterio.open(filename, 'r')
    input_type = src_raster.profile['dtype']
    input_channels = src_raster.count
    img = np.zeros((src_raster.height, src_raster.width, src_raster.count), dtype=input_type)
    for band in range(input_channels):
        img[:, :, band] = src_raster.read(band+1)
    return img
    
# Definir el generador de aumentación de imágenes
def image_augmentation(image):
    datagen = ImageDataGenerator(
        rotation_range=360,
        width_shift_range=1, 
        height_shift_range=1,
        shear_range=1, 
        zoom_range=1, 
        horizontal_flip=True,
        vertical_flip=True,
        rescale=1./255,
        channel_shift_range=1,
        brightness_range=(0.1, 1.0),
        fill_mode='nearest'
    )
    image = tf.convert_to_tensor(image, dtype=tf.float32)
    # Aplica una transformación aleatoria a la imagen
    image = datagen.random_transform(image.numpy())  # Necesitamos convertir a numpy para aplicar ImageDataGenerator
    return image




def create_tf_dataset(objs, batch_size, categories, do_shuffle=False):
    def generator():
        for filename, obj in objs:
            # Cargar la imagen y asegurarnos de que tenga el tamaño correcto
            original_image = load_geoimage(filename)
            original_image = tf.image.resize(original_image, (224, 224))  # Redimensionar

            # Crear etiqueta en formato one-hot
            label = np.zeros(len(categories), dtype=np.float32)
            label[list(categories.values()).index(obj.category)] = 1

            # Convertimos a tensores de TensorFlow
            original_image = tf.convert_to_tensor(original_image, dtype=tf.float32)

            # Ocasionalmente usar la imagen aumentada en lugar de la original
            if random.random() > 0.5:  # 50% probabilidad de usar imagen aumentada
                original_image = image_augmentation(original_image)

            # Retornar solo UNA imagen por iteración
            yield original_image, label
    # Crear dataset de TensorFlow a partir del generador
    dataset = tf.data.Dataset.from_generator(
        generator,
        output_signature=(
            tf.TensorSpec(shape=(224, 224, 3), dtype=tf.float32),  # Imagen con tamaño fijo
            tf.TensorSpec(shape=(len(categories),), dtype=tf.float32)  # Etiqueta one-hot
        )
    )

    # Aplicar mezcla, batching y prefetching
    if do_shuffle:
        dataset = dataset.shuffle(buffer_size=len(objs))
    dataset = dataset.batch(batch_size).prefetch(tf.data.experimental.AUTOTUNE)

    return dataset


In [41]:
import matplotlib.pyplot as plt
import numpy as np

def draw_confusion_matrix(cm, categories):
    # Draw confusion matrix
    fig = plt.figure(figsize=[6.4*pow(len(categories), 0.5), 4.8*pow(len(categories), 0.5)])
    ax = fig.add_subplot(111)
    cm = cm.astype('float') / np.maximum(cm.sum(axis=1)[:, np.newaxis], np.finfo(np.float64).eps)
    im = ax.imshow(cm, interpolation='nearest', cmap=plt.cm.get_cmap('Blues'))
    ax.figure.colorbar(im, ax=ax)
    ax.set(xticks=np.arange(cm.shape[1]), yticks=np.arange(cm.shape[0]), xticklabels=list(categories.values()), yticklabels=list(categories.values()), ylabel='Annotation', xlabel='Prediction')
    # Rotate the tick labels and set their alignment
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")
    # Loop over data dimensions and create text annotations
    thresh = cm.max() / 2.0
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], '.2f'), ha="center", va="center", color="white" if cm[i, j] > thresh else "black", fontsize=int(20-pow(len(categories), 0.5)))
    fig.tight_layout()
    plt.show(fig)

In [42]:
import json

# Load database
json_file = '/kaggle/input/xview-recognition/xview_recognition/xview_ann_train.json'
with open(json_file) as ifs:
    json_data = json.load(ifs)
ifs.close()


In [43]:
import numpy as np

counts = dict.fromkeys(categories.values(), 0)
anns = []
for json_img, json_ann in zip(json_data['images'].values(), json_data['annotations'].values()):
    image = GenericImage('/kaggle/input/xview-recognition/xview_recognition/'+json_img['filename'])
    image.tile = np.array([0, 0, json_img['width'], json_img['height']])
    obj = GenericObject()
    obj.bb = (int(json_ann['bbox'][0]), int(json_ann['bbox'][1]), int(json_ann['bbox'][2]), int(json_ann['bbox'][3]))
    obj.category = json_ann['category_id']
    # Resampling strategy to reduce training time
    counts[obj.category] += 1
    image.add_object(obj)
    anns.append(image)
print(counts)

{'Cargo plane': 635, 'Helicopter': 70, 'Small car': 4290, 'Bus': 2155, 'Truck': 2746, 'Motorboat': 1069, 'Fishing vessel': 706, 'Dump truck': 1236, 'Excavator': 789, 'Building': 4689, 'Storage tank': 1469, 'Shipping container': 1523}


##### 2.- Architecture definition

In [44]:
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Dropout, Activation
from tensorflow.keras.optimizers import Adam
import tensorflow as tf

# Cargar el modelo VGG16 preentrenado en ImageNet, sin las capas superiores
base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Congelar las capas del modelo base para que no se entrenen
for layer in base_model.layers:
    layer.trainable = False

# Crear el modelo secuencial con la arquitectura deseada
model = Sequential()
model.add(base_model)  # Añadir VGG16 como base
model.add(Flatten())  # Aplanar la salida del modelo base

# Agregar capas densas para fine-tuning
model.add(Dense(128, activation='elu'))
model.add(Dense(64, activation='elu'))
model.add(Dropout(0.2))  # Dropout para evitar sobreajuste

# Capa de salida con la cantidad de categorías
model.add(Dense(len(categories), activation='softmax'))  # Aquí usas el número de clases


# Ver el resumen del modelo
model.summary()



In [45]:
from tensorflow.keras.optimizers import SGD

# Usando SGD con momentum
opt_sgd = SGD(learning_rate=0.01, momentum=0.9, nesterov=True)


In [46]:
import matplotlib.pyplot as plt
from IPython.display import clear_output
from tensorflow.keras.callbacks import LambdaCallback
from tensorflow.keras.callbacks import TerminateOnNaN, EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

train_loss = []
val_loss = []
train_acc = []
val_acc = []

def plot_metrics(epoch, logs):
    train_loss.append(logs['loss'])
    val_loss.append(logs['val_loss'])
    train_acc.append(logs['accuracy'])
    val_acc.append(logs['val_accuracy'])
    clear_output(wait=True)
    plt.figure(figsize=(12, 4))
    
    # Loss
    plt.subplot(1, 2, 1)
    plt.plot(train_loss, label='Train Loss')
    plt.plot(val_loss, label='Validation Loss')
    plt.title('Loss')
    plt.legend()

    # Accuracy
    plt.subplot(1, 2, 2)
    plt.plot(train_acc, label='Train Accuracy')
    plt.plot(val_acc, label='Validation Accuracy')
    plt.title('Accuracy')
    plt.legend()
    plt.show()
    
plot_callback = LambdaCallback(on_epoch_end=plot_metrics)

# Callbacks
model_checkpoint = ModelCheckpoint('/kaggle/working/model.keras', monitor='val_accuracy', verbose=1, save_best_only=True)
reduce_lr = ReduceLROnPlateau('val_accuracy', factor=0.1, patience=10, verbose=1)
early_stop = EarlyStopping('val_accuracy', patience=40, verbose=1)
terminate = TerminateOnNaN()
callbacks = [plot_callback, model_checkpoint, reduce_lr, early_stop, terminate]

In [47]:
def split_annotations(anns, train_size=0.7, valid_size=0.15, test_size=0.15):
    # Mezclar las anotaciones aleatoriamente
    np.random.shuffle(anns)
    
    # Calcular los índices de corte para la división
    train_idx = int(len(anns) * train_size)
    valid_idx = int(len(anns) * (train_size + valid_size))
    
    # Dividir las anotaciones en entrenamiento, validación y prueba
    anns_train = anns[:train_idx]
    anns_valid = anns[train_idx:valid_idx]
    anns_test = anns[valid_idx:]
    
    return anns_train, anns_valid, anns_test

# Dividir las anotaciones en entrenamiento, validación y prueba
anns_train, anns_valid, anns_test = split_annotations(anns, train_size=0.7, valid_size=0.15, test_size=0.15)

In [48]:
# Generate the list of objects from annotations
objs_train = [(ann.filename, obj) for ann in anns_train for obj in ann.objects]
objs_valid = [(ann.filename, obj) for ann in anns_valid for obj in ann.objects]
# Generators
batch_size = 128 # Change batch size - Assignment 3 (faster)
train_dataset = create_tf_dataset(objs_train, batch_size=batch_size, categories=categories, do_shuffle=True)
valid_dataset = create_tf_dataset(objs_valid, batch_size=batch_size, categories=categories, do_shuffle=False)

In [None]:
import math
import numpy as np
print('Training model')
epochs = 20
train_steps = math.ceil(len(objs_train)/batch_size)
valid_steps = math.ceil(len(objs_valid)/batch_size)

model.compile(optimizer=opt_sgd, loss='categorical_crossentropy', metrics=['accuracy'])
h = model.fit(train_dataset, steps_per_epoch=train_steps, validation_data=valid_dataset, validation_steps=valid_steps, epochs=epochs, callbacks=callbacks, verbose=1)
# Best validation model
best_idx = int(np.argmax(h.history['val_accuracy']))
best_value = np.max(h.history['val_accuracy'])
print('Best validation model: epoch ' + str(best_idx+1), ' - val_accuracy ' + str(best_value))

Training model
Epoch 1/20


In [None]:
import matplotlib.pyplot as plt
print(h.history.keys())

plt.plot(h.history['accuracy'])
plt.plot(h.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

plt.plot(h.history['loss'])
plt.plot(h.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()


In [None]:
import numpy as np

#model.load_weights('model.hdf5', by_name=True)
y_true, y_pred = [], []
for ann in anns:
    # Load image
    image = load_geoimage(ann.filename)
    for obj_pred in ann.objects:
        # Generate prediction
        warped_image = np.expand_dims(image, 0)
        predictions = model.predict(warped_image)
        # Save prediction
        pred_category = list(categories.values())[np.argmax(predictions)]
        pred_score = np.max(predictions)
        y_true.append(obj_pred.category)
        y_pred.append(pred_category)

In [None]:
from sklearn.metrics import confusion_matrix

# Compute the confusion matrix
cm = confusion_matrix(y_true, y_pred, labels=list(categories.values()))
draw_confusion_matrix(cm, categories)

In [None]:
import numpy as np

# Compute the accuracy
correct_samples_class = np.diag(cm).astype(float)
total_samples_class = np.sum(cm, axis=1).astype(float)
total_predicts_class = np.sum(cm, axis=0).astype(float)
print('Mean Accuracy: %.3f%%' % (np.sum(correct_samples_class) / np.sum(total_samples_class) * 100))
acc = correct_samples_class / np.maximum(total_samples_class, np.finfo(np.float64).eps)
print('Mean Recall: %.3f%%' % (acc.mean() * 100))
acc = correct_samples_class / np.maximum(total_predicts_class, np.finfo(np.float64).eps)
print('Mean Precision: %.3f%%' % (acc.mean() * 100))
for idx in range(len(categories)):
    # True/False Positives (TP/FP) refer to the number of predicted positives that were correct/incorrect.
    # True/False Negatives (TN/FN) refer to the number of predicted negatives that were correct/incorrect.
    tp = cm[idx, idx]
    fp = sum(cm[:, idx]) - tp
    fn = sum(cm[idx, :]) - tp
    tn = sum(np.delete(sum(cm) - cm[idx, :], idx))
    # True Positive Rate: proportion of real positive cases that were correctly predicted as positive.
    recall = tp / np.maximum(tp+fn, np.finfo(np.float64).eps)
    # Precision: proportion of predicted positive cases that were truly real positives.
    precision = tp / np.maximum(tp+fp, np.finfo(np.float64).eps)
    # True Negative Rate: proportion of real negative cases that were correctly predicted as negative.
    specificity = tn / np.maximum(tn+fp, np.finfo(np.float64).eps)
    # Dice coefficient refers to two times the intersection of two sets divided by the sum of their areas.
    # Dice = 2 |A∩B| / (|A|+|B|) = 2 TP / (2 TP + FP + FN)
    f1_score = 2 * ((precision * recall) / np.maximum(precision+recall, np.finfo(np.float64).eps))
    print('> %s: Recall: %.3f%% Precision: %.3f%% Specificity: %.3f%% Dice: %.3f%%' % (list(categories.values())[idx], recall*100, precision*100, specificity*100, f1_score*100))

#### Report

You must prepare a report (PDF) describing:
* The problems and data sets (briefly).
* The process that you have followed to reach your solution for the “xview_recognition” benchmark, including your intermediate results. You must discuss and compare these results properly.
* Final network architectures, including optimization algorithms, regularization methods (dropout, data augmentation, etc.), number of layers/parameters, and performance obtained with your model on the train/valid/test data sets, including the plots of the evolution of losses and accuracy.
* It would also be very valuable your feedback on the use of “Cesvima” or “Google Colab" services.

In the submission via Moodle, attach your Python (.py) or Jupyter Notebook (.ipynb) source file, including in the report all results of computations attached to the code that generated them.

The assignment must be done in groups of 3 students.