# Ensembles

In [85]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import metrics
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping,TensorBoard, ReduceLROnPlateau
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten, Conv2D, MaxPooling2D
from tensorflow.keras.layers import BatchNormalization, LeakyReLU, Activation
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import SGD, Adam, legacy
from tensorflow.keras.layers import Input
import tensorflow_addons as tfa 


import os
import numpy as np

# plotting
import matplotlib.pyplot as plt
import pathlib
from PIL import Image
import IPython.display as display

# to display confusion matrix
import seaborn as sn
import pandas as pd

# to determine the most voted
import collections 

AUTOTUNE = tf.data.AUTOTUNE

BATCH_SIZE = 32
IMAGE_HEIGHT = 32
IMAGE_WIDTH = 32

print("Tensorflow version:" + tf.__version__)

Tensorflow version:2.10.0



TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 



### Aux functions

In [2]:
def get_label(file_path):
  # convert the path to a list of path components
  parts = tf.strings.split(file_path, os.path.sep)
  # The second to last is the class-directory
  return parts[-2] == classNames

def decode_img(img):
  # convert the compressed string to a 3D uint8 tensor
  img = tf.image.decode_png(img, channels=3)
  # Use `convert_image_dtype` to convert to floats in the [0,1] range.
  img = tf.image.convert_image_dtype(img, tf.float32)
  # resize the image to the desired size.
  return tf.image.resize(img, [32,32])

def get_bytes_and_label(file_path):
  label = get_label(file_path)
  # load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_img(img)
  return img, label

def show_data(s1,l1, s2,l2, labels, min):
    fig, ax = plt.subplots()
    X = np.arange(len(s1))

    models = labels
    plt.bar(X, s1, width = 0.4, color = 'b', label=l1)
    plt.bar(X + 0.4, s2, color = 'r', width = 0.4, label = l2)
    plt.xticks(X + 0.4 / 2, models)
    plt.ylim(top = 100, bottom = min)
    plt.legend(loc='upper left')
    plt.show()

def show_batch(image_batch, label_batch):
  columns = 6
  rows = BATCH_SIZE / columns + 1  
  plt.figure(figsize=(10, 2 * rows))
  for n in range(BATCH_SIZE):
      ax = plt.subplot(int(rows), columns, n+1)
      plt.imshow((image_batch[n]))
      plt.title(classNames[label_batch[n]==1][0])
      plt.axis('off')


def show_history(history):
    print(history.history.keys())

    # summarize history for accuracy
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='lower right')
    plt.show()
    # summarize history for loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper right')
    plt.show()    


def show_accuracies(labels, test, val): 

    fig, ax = plt.subplots()
    X = np.arange(len(test))

    plt.bar(X, test, width = 0.4, color = 'b', label='test')
    plt.bar(X + 0.4, val, color = 'r', width = 0.4, label = "val")
    plt.xticks(X + 0.4 / 2, labels)
    plt.ylim(top = 1.0, bottom = 0.97)
    plt.legend(loc='upper left')
    plt.show()    



def show_misclassified(predictions, ground_truth, images, num_rows = 5, num_cols=3):
    
    # Plot the first X test images with wrong predictions.
    num_images = num_rows*num_cols
    print(num_images)
    plt.figure(figsize=(2*2*num_cols, 2*num_rows))
    i = 0
    k = 0
    while k < len(images) and i < num_images:
        predicted_label = np.argmax(predictions[k])
        gt = np.where(ground_truth[k])[0][0]
        if predicted_label != gt:
            plt.subplot(num_rows, 2*num_cols, 2*i+1)
            plot_image(k, predictions[k], gt, images)
            plt.subplot(num_rows, 2*num_cols, 2*i+2)
            plot_value_array(k, predictions[k], ground_truth)
            i += 1
        k += 1
    plt.tight_layout()
    plt.show()


def plot_image(i, predictions_array, true_label, img):
    predictions_array, true_label, img = predictions_array, true_label, img[i]
    plt.grid(False)
    plt.xticks([])
    plt.yticks([])
    
    plt.imshow(img, cmap=plt.cm.binary)
    
    predicted_label = np.argmax(predictions_array)
    if predicted_label == true_label:
      color = 'blue'
    else:
      color = 'red'
    
    plt.xlabel("{} {:2.0f}% ({})".format(classNames[predicted_label],
                                100*np.max(predictions_array),
                                classNames[true_label]),
                                color=color)

def plot_value_array(i, predictions_array, true_label):
    predictions_array, true_label = predictions_array, true_label[i]
    plt.grid(False)
    plt.xticks(range(8))
    plt.yticks([])
    thisplot = plt.bar(range(8), predictions_array, color="#777777")
    plt.ylim([0, 1])
    predicted_label = np.argmax(predictions_array)

    thisplot[predicted_label].set_color('red')
    thisplot[np.where(true_label)[0][0]].set_color('blue')   



def show_confusion_matrix(mat, classes):

    df_cm = pd.DataFrame(mat, range(classes), range(classes))
    plt.figure(figsize=(15,10))
    sn.set(font_scale=1.4) # for label size
    sn.heatmap(df_cm, annot=True, annot_kws={"size": 16}, fmt='d') # font size

    plt.show() 

In [3]:
train_path = '/Users/joao/Desktop/MEI/CG/VC/Visao Computador/computer-vision/GTSRB/Final_Training/Images'
test_path = '/Users/joao/Desktop/MEI/CG/VC/Visao Computador/computer-vision/test_images'

train_data_dir=pathlib.Path(train_path)
test_data_dir=pathlib.Path(test_path)

test_image_count = len(list(test_data_dir.glob('*/*.png')))

### Baseline models definitions

#### ModelS

In [4]:
def baseline_modelS(classCount, imgSize, channels):
    model = Sequential()
    model.add(Input(shape=(imgSize, imgSize, channels)))

    model.add(Conv2D(64, (5, 5)))         
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))   
    
    model.add(Conv2D(64, (5, 5) )) 
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(64, (5, 5) ) )   
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Flatten())
    model.add(Dense(128))
    model.add(LeakyReLU(alpha=0.01))             
    model.add(Dropout(0.2))

    model.add(Dense(classCount, activation='softmax'))

    
    opt = Adam(lr=0.0001)
    model.compile(optimizer = opt, loss='categorical_crossentropy', metrics=[ 'accuracy'])
    return model

#### ModelK

In [5]:
def baseline_modelK(classCount, imgSize,channels):
    model = Sequential()

    model.add(Conv2D(filters=16, kernel_size=(3,3), activation='relu', input_shape=(imgSize, imgSize, channels)))
    model.add(Conv2D(filters=32, kernel_size=(3,3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(BatchNormalization(axis=-1))
    
    model.add(Conv2D(filters=64, kernel_size=(3,3), activation='relu'))
    model.add(Conv2D(filters=128, kernel_size=(3,3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(BatchNormalization(axis=-1))
    
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(rate=0.5))
    
    model.add(Dense(classCount, activation='softmax'))
    
    lr = 0.001
    epochs = 30
    opt = Adam(lr=lr, decay=lr / (epochs * 0.5))
    model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])
    return model

#### ModelGr

In [6]:
def baseline_modelGr(classCount, imgSize,channels):
    model = Sequential()
    model.add(Input(shape=(imgSize, imgSize, channels)))

    # Larger filters
    model.add(Conv2D(128, (5, 5)))         
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))   
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(128, (5, 5)))         
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))   
    model.add(MaxPooling2D(pool_size=(2, 2)))

    # Smaller filters
    #model.add(Conv2D(64, (3, 3)))         
    #model.add(BatchNormalization())
    #model.add(Activation('relu'))   
    #model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(128, (3, 3)))         
    model.add(BatchNormalization())
    model.add(Activation('relu'))   
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Flatten())
    model.add(Dense(512))
    model.add(Activation('relu'))  
    model.add(BatchNormalization())          
    model.add(Dropout(0.5))

    model.add(Dense(classCount, activation='softmax'))

    opt = Adam(lr=0.001, decay=0.001 / 30) # Same learning rate and decay as ModelK
    model.compile(optimizer = opt, loss='categorical_crossentropy', metrics=[ 'accuracy'])
    return model
    

### Dynamic Models definitions

#### ModelS

In [7]:
def dynamic_modelS(classCount, imgSize, channels):
    model = Sequential()
    model.add(Input(shape=(imgSize, imgSize, channels)))

    model.add(Conv2D(64, (5, 5)))         
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))   
    
    model.add(Conv2D(64, (5, 5) )) 
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(64, (5, 5) ) )   
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    
    model.add(Flatten())
    model.add(Dense(128))
    model.add(LeakyReLU(alpha=0.01))             
    model.add(Dropout(0.2))

    model.add(Dense(classCount, activation='softmax'))

    
    opt = Adam(lr=0.0001)
    model.compile(optimizer = opt, loss='categorical_crossentropy', metrics=[ 'accuracy'])
    return model

#### ModelK

In [8]:
def dynamic_modelK(classCount, imgSize,channels):
    model = Sequential()

    model.add(Conv2D(filters=16, kernel_size=(3,3), activation='relu', input_shape=(imgSize, imgSize, channels)))
    model.add(Conv2D(filters=32, kernel_size=(3,3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(BatchNormalization(axis=-1))
    
    model.add(Conv2D(filters=64, kernel_size=(3,3), activation='relu'))
    model.add(Conv2D(filters=128, kernel_size=(3,3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(BatchNormalization(axis=-1))
    
    model.add(Flatten())
    model.add(Dense(512, activation='relu'))
    model.add(BatchNormalization())
    model.add(Dropout(rate=0.5))
    
    model.add(Dense(classCount, activation='softmax'))
    
    lr = 0.001
    epochs = 30
    opt = Adam(lr=lr, decay=lr / (epochs * 0.5))
    model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])
    return model

#### ModelGr

In [9]:
def dynamic_modelGr(classCount, imgSize,channels):
    model = Sequential()
    model.add(Input(shape=(imgSize, imgSize, channels)))

    # Larger filters
    model.add(Conv2D(128, (5, 5)))         
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))   
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(128, (5, 5)))         
    model.add(BatchNormalization())
    model.add(LeakyReLU(alpha=0.01))   
    model.add(MaxPooling2D(pool_size=(2, 2)))

    # Smaller filters
    #model.add(Conv2D(64, (3, 3)))         
    #model.add(BatchNormalization())
    #model.add(Activation('relu'))   
    #model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(128, (3, 3)))         
    model.add(BatchNormalization())
    model.add(Activation('relu'))   
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Flatten())
    model.add(Dense(512))
    model.add(Activation('relu'))  
    model.add(BatchNormalization())          
    model.add(Dropout(0.5))

    model.add(Dense(classCount, activation='softmax'))

    opt = Adam(lr=0.001, decay=0.001 / 30) # Same learning rate and decay as ModelK
    model.compile(optimizer = opt, loss='categorical_crossentropy', metrics=[ 'accuracy'])
    return model

### Load the models

In [10]:
model_creators = {
    'basemodel_S': baseline_modelS,
    'basemodel_K': baseline_modelK,
    'basemodel_Gr': baseline_modelGr,
    'dynamic_S': dynamic_modelS,
    'dynamic_K': dynamic_modelK,
    'dynamic_Gr': dynamic_modelGr,
}

def load_model(model_type, model_name):
    function_name = f'{model_type.lower()}_{model_name}'
    model_creator = model_creators[function_name]

    if model_type == 'BaseModel':
        if model_name == 'Gr':  # special case for 'Gr' model in BaseModel
            model_path = f'logs/baselineModelsLogs/model{model_name}/best{model_name}/cp.ckpt'
        else:
            model_path = f'logs/baselineModelsLogs/model{model_name}/bestM{model_name}/cp.ckpt'
    else:  # assuming the other type is 'Dynamic'
        model_path = f'logs/dynamicLogs/model{model_name}/dynamic1/cp.ckpt'

    model = model_creator(43, 32, 3)  # Initialize the model architecture
    model.load_weights(model_path)  # Load the weights
    return model


In [11]:
# Load all six models
base_s = load_model('BaseModel', 'S')
base_k = load_model('BaseModel', 'K')
base_gr = load_model('BaseModel', 'Gr')
dynamic_s = load_model('Dynamic', 'S')
dynamic_k = load_model('Dynamic', 'K')
dynamic_gr = load_model('Dynamic', 'Gr')

# Put them in a list for convenience
models = [base_s, base_k, base_gr, dynamic_s, dynamic_k, dynamic_gr]

2023-06-13 23:06:20.396358: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
  super().__init__(name, **kwargs)


### Preparing the data

In [41]:
def normalize(images, labels):
    images = tf.cast(images, tf.float32)
    images /= 255
    return images, labels

def one_hot_encode(image, label):
    label = tf.one_hot(label, depth=43)
    return image, label


In [86]:
def process_image_class(image,label):
    # random rotate 5 degrees
    r = tf.random.uniform(shape=(), minval=-0.175, maxval=0.175, dtype=tf.dtypes.float32)
    image = tfa.image.rotate(image, r)

    # translate image up to 10%
    rx = tf.random.uniform(shape=(), minval=-3, maxval=3, dtype=tf.dtypes.float32) 
    ry = tf.random.uniform(shape=(), minval=-3, maxval=3, dtype=tf.dtypes.float32) 
    image = tfa.image.translate(image, [rx, ry])
    
   
    # change hue, saturation and value
    image = tfa.image.random_hsv_in_yiq(image, 0.2, 0.4, 1.1, 0.4, 1.1)
    image = tf.clip_by_value(image,0,1)

    return image, label

In [92]:

# Load and preprocess the test data
test_ds = tf.keras.preprocessing.image_dataset_from_directory(
    test_data_dir,
    seed=123,
    image_size=(IMAGE_HEIGHT, IMAGE_WIDTH),
    label_mode='categorical',
    batch_size=BATCH_SIZE)

train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    train_data_dir,
    seed=123,
    image_size=(IMAGE_HEIGHT, IMAGE_WIDTH),
    batch_size=BATCH_SIZE)

test_ds = test_ds.map(normalize, num_parallel_calls=AUTOTUNE)
# test_ds = test_ds.map(one_hot_encode, num_parallel_calls=AUTOTUNE)
test_ds = test_ds.cache().prefetch(buffer_size=AUTOTUNE)

train_ds = train_ds.map(normalize,num_parallel_calls=AUTOTUNE)
train_ds = train_ds.map(one_hot_encode,num_parallel_calls=AUTOTUNE)
train_ds = train_ds.cache().shuffle(buffer_size=10200)
train_ds = train_ds.cache().prefetch(buffer_size=AUTOTUNE)

Found 12630 files belonging to 43 classes.
Found 39209 files belonging to 43 classes.


In [93]:
def extract_labels_from_dataset(dataset):
    labels = []
    for _, label in dataset:
        labels.extend(np.argmax(label.numpy(), axis=-1)) # Assumes one-hot encoded labels in the dataset
    return np.array(labels)

train_labels = extract_labels_from_dataset(train_ds)
print(train_labels)

[33  7 21 ...  5 18 38]


## Voting Ensemble

In [42]:
def ensemble_predict(models, dataset):
    # Initialize a list to store the predictions for all models
    predictions = []

    # Iterate over all batches in the dataset
    for images, _ in dataset:
        # Initialize a list to store the predictions for this batch
        batch_predictions = []

        # Let each model make predictions on the batch
        for model in models:
            preds = model.predict(images)
            # Convert softmax outputs to class labels
            preds = np.argmax(preds, axis=-1)
            batch_predictions.append(preds)

        # Transpose the batch_predictions list to get the predictions of all models for each image in the batch
        batch_predictions = np.array(batch_predictions).T

        # For each image in the batch, count the occurrences of each predicted class
        # The class with the most votes becomes the ensemble prediction for that image
        if len(batch_predictions.shape) > 1:  # more than one image in batch
            ensemble_preds = [np.bincount(preds).argmax() for preds in batch_predictions]
        else:  # only one image in batch
            ensemble_preds = [np.bincount(batch_predictions).argmax()]

        predictions.extend(ensemble_preds)

    return np.array(predictions)


In [43]:
# Make predictions with the ensemble
ensemble_predictions = ensemble_predict(models, test_ds)




In [44]:
def evaluate_ensemble(ensemble_predictions, dataset):
    correct_count = 0
    total_count = 0

    # Compare the ensemble predictions with the true labels
    for i, (_, labels) in enumerate(dataset):
        start = i * labels.shape[0]
        end = start + labels.shape[0]

        # Increase total count
        total_count += labels.shape[0]
        # Increase correct count if the ensemble prediction matches the true label
        correct_count += np.sum(ensemble_predictions[start:end] == np.argmax(labels.numpy(), axis=-1))

    # Calculate accuracy
    accuracy = correct_count / total_count

    return accuracy

# Calculate ensemble accuracy
ensemble_accuracy = evaluate_ensemble(ensemble_predictions, test_ds)
print(f'Ensemble accuracy: {ensemble_accuracy * 100:.4f}%')


Ensemble accuracy: 98.6936%


## Stacked ensemble

In [94]:
from sklearn.linear_model import LinearRegression
from tensorflow.keras.utils import to_categorical

def stack_predict(models, dataset):
    predictions = []
    for images, _ in dataset:
        batch_predictions = np.zeros((len(images), 43))  # Initialize with zeros. 43 is the number of classes.
        for model in models:
            preds = model.predict(images)
            batch_predictions += preds  # Sum the predictions
        if len(predictions) == 0:  # if predictions list is empty
            predictions = batch_predictions
        else:
            predictions = np.vstack((predictions, batch_predictions))  # Stack the arrays vertically

    return predictions



In [95]:
# Compute the training set predictions for the stacking model
train_predictions = stack_predict(models, train_ds)



In [96]:

# One-hot encode the training set labels
train_labels_one_hot = to_categorical(train_labels, num_classes=43) 

In [97]:
# Fit the second level model
second_level_model = LinearRegression()
second_level_model.fit(train_predictions, train_labels_one_hot)

In [98]:
# Now for the test predictions, we use the second level model
test_predictions = stack_predict(models, test_ds)
stacked_predictions = second_level_model.predict(test_predictions)


# Convert softmax outputs to class labels
stacked_predictions = np.argmax(stacked_predictions, axis=-1)



In [99]:
from sklearn.metrics import accuracy_score
import numpy as np

def evaluate_accuracy(true_labels, predicted_labels):
    if len(true_labels.shape) > 1:  # one-hot encoded
        true_labels = np.argmax(true_labels, axis=-1)
    accuracy = accuracy_score(true_labels, predicted_labels)
    return accuracy

In [100]:
# Assuming you have a 'test_labels' array with the true labels

test_labels = extract_labels_from_dataset(test_ds)

stacked_accuracy = evaluate_accuracy(test_labels, stacked_predictions)

print(f"Stacked ensemble model accuracy: {stacked_accuracy * 100:.2f}%")


Stacked ensemble model accuracy: 99.04%
