In [None]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'


import tensorflow as tf
from tensorflow import keras
import tarfile 
import matplotlib.pyplot as plt
import pandas as pd
from collections import Counter
from math import log2, log10

# Extract and Load Data

In [None]:
def extract_data(tarfile_path = './dataset.tar'):
  with tarfile.TarFile(tarfile_path) as f:
    f.extractall()

extract_data()

In [None]:
batch_size = 128

train_ds = keras.utils.image_dataset_from_directory(
    './Dataset/train',
    label_mode='int',
    batch_size=batch_size,
    image_size=(224, 224),
)

val_ds = keras.utils.image_dataset_from_directory(
    './Dataset/val',
    label_mode='int',
    batch_size=batch_size,
    image_size=(224, 224),
    shuffle = False
)


train_ds = train_ds.map(lambda x, y: (keras.applications.mobilenet_v2.preprocess_input(x), y))
val_ds = val_ds.map(lambda x, y: (keras.applications.mobilenet_v2.preprocess_input(x), y))

# Create model

## Get Feature Extractor

In [None]:
mobile_net_feature_extractor = keras.applications.mobilenet_v2.MobileNetV2(
    alpha=1,
    include_top=False, 
    weights='imagenet', 
    input_shape = (224,224,3), 
    pooling='avg'
    )
for layer in mobile_net_feature_extractor.layers:
    layer.trainable = False

In [None]:
mobile_net_feature_extractor.summary()

## Helper Functions to create models

In [None]:
def create_baseline_model(feature_extractor, classes = 1000, image_size = 224):
    # MEC 1281000
    inp = keras.Input(shape = (image_size, image_size,3))
    x = feature_extractor(inp)
    out = keras.layers.Dense(classes, activation = 'softmax')(x)

    return keras.models.Model(inputs=[inp], outputs= [out])

def create_mec_model(feature_extractor, n, classes = 1000, image_size = 224):
    '''
    32: 41024
    42: 53844
    54: 69228
    77: 98714
    500: 641000
    750: 961500
    875: 1121750
    950: 1217900
    '''
    inp = keras.Input(shape = (image_size, image_size,3))
    x = feature_extractor(inp)
    x = keras.layers.Dense(n)(x)
    out = keras.layers.Dense(classes, activation = 'softmax')(x)
    return keras.models.Model(inputs=[inp], outputs= [out]) 

def create_mec_model_with_relu(feature_extractor, n, classes = 1000, image_size = 224):
    inp = keras.Input(shape = (image_size, image_size,3))
    x = feature_extractor(inp)
    x = keras.layers.Dense(n, activation = 'relu')(x)
    out = keras.layers.Dense(classes, activation = 'softmax')(x)
    return keras.models.Model(inputs=[inp], outputs= [out]) 

# Training

In [None]:
model_name = 'baseline'
os.mkdir(model_name)
epochs = 1000
overfit = True

In [None]:
model = create_baseline_model(mobile_net_feature_extractor)
# model = create_mec_model(mobile_net_feature_extractor, 32)
# model = create_mec_model_with_relu(mobile_net_feature_extractor, 32)
model.summary()

In [None]:
callbacks = [
        tf.keras.callbacks.EarlyStopping(monitor='loss', patience= 10, verbose = 1),
        tf.keras.callbacks.ModelCheckpoint(filepath = f'./{model_name}/{model_name}.h5', monitor = 'loss', save_best_only = True, verbose = 1),
        tf.keras.callbacks.LearningRateScheduler(keras.optimizers.schedules.ExponentialDecay(initial_learning_rate=0.045, decay_steps=1, decay_rate=0.98))
        ]

In [None]:
model.compile(
      optimizer= keras.optimizers.RMSprop(
          learning_rate = 0.045,
          momentum=0.9,
          rho = 0.9
          ),
      loss = keras.losses.SparseCategoricalCrossentropy(), 
      metrics=['accuracy']
      )

history = model.fit(
    train_ds,
    batch_size = batch_size,
    epochs = epochs,
    callbacks = callbacks,
    validation_data = val_ds
    )

## Train plots

In [None]:
pd.DataFrame(history.history).to_csv(f'./{model_name}/{model_name}.csv', index = False)

df = pd.read_csv(f'./{model_name}/{model_name}.csv')

df[['val_accuracy', 'accuracy']].plot()
df[['val_loss', 'loss']].plot()


# Test result

In [None]:
batch_size = 128
model_path = './{model_name}/{model_name}.h5'
MEC = 1.28e6

train_ds = keras.utils.image_dataset_from_directory(
    './Dataset/train',
    label_mode='int',
    batch_size=batch_size,
    image_size=(224, 224),
    shuffle = False
)

val_ds = keras.utils.image_dataset_from_directory(
    './Dataset/val',
    label_mode='int',
    batch_size=batch_size,
    image_size=(224, 224),
    shuffle = False
)

train_ds = train_ds.map(lambda x, y: (keras.applications.mobilenet_v2.preprocess_input(x), y))
val_ds = val_ds.map(lambda x, y: (keras.applications.mobilenet_v2.preprocess_input(x), y))

In [None]:
import numpy as np
from sklearn.metrics import top_k_accuracy_score

def get_predictions_and_labels(model, data):
    predictions = model.predict(data)
    labels = []
    for _, y in data.as_numpy_iterator():
        labels.extend(y)
        
    return predictions, labels

def generalization_ratio(labels, predictions, MEC):
    class_count = Counter(labels)
    correct_prediction_count = {i:0 for i in range(1000)}

    for p, gt in zip(predictions, labels):
        if np.argmax(p) == gt:
            correct_prediction_count[gt] += 1

    G = -sum(correct_prediction_count[key] * log2(class_count[key] / len(labels)) for key in class_count)/MEC
    R = 20 * log10(1/G)
    return G, R


def get_performance_description(model, data, MEC):
    predictions, labels = get_predictions_and_labels(model, data)
    print('Top 1 accuracy:', top_k_accuracy_score(labels, predictions, k = 1))
    print('Top 5 accuracy:', top_k_accuracy_score(labels, predictions, k = 5))
    G, R = generalization_ratio(labels, predictions, MEC)
    print(f'Generalization ratio: {G}')
    print(f'Resilience: {R} dB')

In [None]:
model = keras.models.load_model(model_path)
model.trainable = False

In [None]:
get_performance_description(model_path, train_ds, MEC)

In [None]:
get_performance_description(model_path, val_ds, MEC)