In [None]:
import os
import time
import numpy as np
import tensorflow as tf
from tensorflow.keras import applications, datasets, utils, Model
from tensorflow.keras.layers import Flatten, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import Callback, ModelCheckpoint, LearningRateScheduler
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, f1_score, precision_score, recall_score
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
class TimeHistory(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.times = []

    def on_epoch_begin(self, batch, logs={}):
        self.epoch_time_start = time.time()

    def on_epoch_end(self, batch, logs={}):
        self.times.append(time.time() - self.epoch_time_start)

In [None]:
def load_cars196_dataset(path_to_dataset):
    annotations = scipy.io.loadmat(os.path.join(path_to_dataset, 'cars_annos.mat'))

    annotations = annotations["annotations"][0]

    train_indices = np.where(annotations["test"] == 0)[1]
    test_indices = np.where(annotations["test"] == 1)[1]

    def load_data(indices):
        num_samples = len(indices)
        car_annotations = annotations[indices]
        x = np.empty((num_samples, ), dtype=object)
        y = np.empty((num_samples, ), dtype=int)

        for i, annotation in enumerate(car_annotations):
            img_path = os.path.join(path_to_dataset, annotation[0][0])
            img = Image.open(img_path)
            x[i] = np.array(img)
            y[i] = int(annotation[5]) - 1

        return x, y

    x_train, y_train = load_data(train_indices)
    x_test, y_test = load_data(test_indices)

    return (x_train, y_train), (x_test, y_test)


In [None]:
def train_model(model_name, dataset_name, epochs, device):
    if device not in ['CPU', 'GPU', 'TPU']:
        print('Unknown device')
        return
    if device == 'CPU':
        tf.device('/CPU:0')
    elif device == 'GPU':
        tf.device('/GPU:0')
    elif device == 'TPU':
        try:
            tpu_address = 'grpc://' + os.environ['COLAB_TPU_ADDR']
            resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu=tpu_address)
            tf.config.experimental_connect_to_cluster(resolver)
            tf.tpu.experimental.initialize_tpu_system(resolver)
            strategy = tf.distribute.TPUStrategy(resolver)
        except KeyError:
            print('TPU not found')
            return
        else:
            tf.device('/TPU:0')


    if model_name == 'resnet':
        base_model = applications.ResNet50(weights='imagenet', include_top=False, input_shape=(32, 32, 3))
    elif model_name == 'vgg':
        base_model = applications.VGG16(weights='imagenet', include_top=False, input_shape=(32, 32, 3))
    elif model_name == 'mobilenet':
        base_model = applications.MobileNet(weights='imagenet', include_top=False, input_shape=(32, 32, 3))
    else:
        print('Unknown model')
        return


    path_to_cars196 = '/content/gdrive/MyDrive/cars196'
    if dataset_name == 'cifar10':
        (x, y), (x_test, y_test) = datasets.cifar10.load_data()
    elif dataset_name == 'cifar100':
        (x, y), (x_test, y_test) = datasets.cifar100.load_data()
    elif dataset_name == 'cars193':
        (x, y), (x_test, y_test) = load_cars196_dataset(path_to_cars196)
    else:
        print('Unknown dataset')
        return

    x_train, x_val, y_train, y_val = train_test_split(x, y, test_size=0.2)

    x_train = x_train.astype('float32') / 255.0
    x_val = x_val.astype('float32') / 255.0
    x_test = x_test.astype('float32') / 255.0
    y_train = utils.to_categorical(y_train)
    y_val = utils.to_categorical(y_val)
    y_test = utils.to_categorical(y_test)

    x = base_model.output
    x = Flatten()(x)
    output = Dense(y_train.shape[1], activation='softmax')(x)
    model = Model(base_model.input, output)

    model.compile(loss='categorical_crossentropy', optimizer=Adam(), metrics=['accuracy'])
    history = model.fit(x_train, y_train, batch_size=32, epochs=epochs, validation_data=(x_val, y_val))

    time_callback = TimeHistory()
    history = model.fit(x_train, y_train, batch_size=32, epochs=epochs, validation_data=(x_val, y_val), callbacks=[time_callback])

    return history, time_callback, model, x_test, y_test


In [None]:
def evaluate_and_display_results(history, time_callback, model, x_test, y_test, model_name, dataset_name, epoch ):
    test_loss, test_accuracy = model.evaluate(x_test, y_test, verbose=0)

    y_test_pred = model.predict(x_test)
    cm = confusion_matrix(np.argmax(y_test, axis=1), np.argmax(y_test_pred, axis=1))

    total_time = sum(time_callback.times)

    with open(f'results{model_name}_{dataset_name}_{epoch}.txt', 'w') as f:
        f.write('Rezultate antrenament:\n')
        f.write(f'Test Loss: {test_loss}\n')
        f.write(f'Test Accuracy: {test_accuracy}\n')
        f.write('Matricea de confuzie:\n')
        f.write(np.array2string(cm, separator=', '))
        f.write('\nTimp total: {}\n'.format(total_time))
        f.write('Timp/epocă:\n')
        for i, time in enumerate(time_callback.times):
            f.write('Epoca {}: {}\n'.format(i+1, time))

    print(f'test loss: {test_loss}\ntest_acc: {test_accuracy}\nconf matr:\n{cm}\ntimp total:{total_time}')
    for i, time in enumerate(time_callback.times):
      print('Epoca {}: {}\n'.format(i+1, time))

    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Loss evolution')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Train Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Accuracy evolution')
    plt.legend()

    plt.tight_layout()
    #plt.savefig('evolution.png')
    plt.show()

    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()
    #plt.savefig('confusion_matrix.png')


In [None]:

def main():
    devices = ['GPU', 'CPU']
    dataset_names = ['cifar10', 'cifar100']
    model_names = ['resnet', 'vgg', 'mobilenet']
    epochs = [10, 20, 50, 100]

    for epoch in epochs:
        for dataset_name in dataset_names:
            for model_name in model_names:
                for device in devices:
                    print(f'{device}_{dataset_name}_{model_name}_{epoch}\n')
                    history, time_callback, model, x_test, y_test = train_model(model_name, dataset_name, epoch, device)
                    evaluate_and_display_results(history, time_callback, model, x_test, y_test, model_name, dataset_name, epoch)


In [None]:
if __name__=='__main__':
  main()