In [None]:
!pip install tensorflow_addons

Collecting tensorflow_addons
  Downloading tensorflow_addons-0.21.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (612 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/612.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m122.9/612.1 kB[0m [31m4.0 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━[0m [32m471.0/612.1 kB[0m [31m7.2 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m612.1/612.1 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
Collecting typeguard<3.0.0,>=2.7 (from tensorflow_addons)
  Downloading typeguard-2.13.3-py3-none-any.whl (17 kB)
Installing collected packages: typeguard, tensorflow_addons
Successfully installed tensorflow_addons-0.21.0 typeguard-2.13.3


In [1]:
from typing import Tuple, List, Dict
from google.colab import drive
from os import listdir
from os.path import isfile
from copy import deepcopy
from math import ceil
import numpy as np
import tensorflow as tf
# import tensorflow_addons as tfa
from tensorflow import device

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [16]:
root_folder = '/content/drive/MyDrive/'
tfr_prefix_path = root_folder + 'Objective 3/TFR_all/'
# tfr_prefix_path = root_folder + 'Objective 3/TFR_CV/'
# tfr_prefix_path = root_folder + 'Objective 3/TFR_blind/'

tf.keras.utils.set_random_seed(0)

In [5]:
def check_gpu() -> None:
    device_name = tf.test.gpu_device_name()
    if device_name != '/device:GPU:0':
        raise SystemError('GPU device not found')
    print('Found GPU at: {}'.format(device_name))

In [6]:
feature_descriptor_classification = {
    'volume': tf.io.FixedLenFeature((128, 128, 3), tf.float32),
    'y': tf.io.FixedLenFeature((1,), tf.int64),
}


def parse_example_classification(inp):
    parsed_features = tf.io.parse_single_example(inp, feature_descriptor_classification)
    return parsed_features


def parse_matrices_classification(inp):
    parsed_features = tf.io.parse_single_example(inp, feature_descriptor_classification)
    return parsed_features['volume'], parsed_features['y']


def get_dataset(filenames: List[str], batch_size: int, shuffle: bool = True, buffer_size: int = None,
                deterministic: bool = False, parser=parse_matrices_classification) -> tf.data.TFRecordDataset:
    dataset = (
        tf.data.TFRecordDataset(filenames)
        .map(parser, num_parallel_calls=tf.data.AUTOTUNE, deterministic=deterministic)
        .batch(batch_size)
    )
    if shuffle:
        if buffer_size is None:
            buffer_size = batch_size * 10
        dataset = dataset.shuffle(buffer_size=buffer_size, seed=0, reshuffle_each_iteration=True)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)
    return dataset


def verbose_dataset(dataset: tf.data.TFRecordDataset, x_label, y_label) -> None:
    k = 0
    for s in dataset.take(-1).as_numpy_iterator():
        print(k, '->', s[x_label].shape, s[y_label].shape, s[y_label][0, 0])
        # print(s[y_label][0, 0])
        k += 1
    print()

In [7]:
class Model:
    def __init__(self, model_id: str, dataset: tf.data.TFRecordDataset, dataset_size: int, batch_size: int,
                 epochs: int = 100, shuffle: bool = False) -> None:
        self.model_id = model_id
        self.dataset_size = dataset_size
        self.batch_size = batch_size
        self.epochs = epochs
        self.estimator = None
        self.params = None
        self.width = 128
        self.height = 128
        self.trained = False
        self.seed = 0
        self.train_dataset, self.val_dataset, self.test_dataset = self._split_binary_dataset(dataset, shuffle=shuffle)
        self.train_size = int(0.70 * self.dataset_size)
        self.val_size = int(0.15 * self.dataset_size)
        self.test_size = self.dataset_size - (self.train_size + self.val_size)

    def is_trained(self) -> bool:
        return self.trained

    def set_params(self, params: Dict[str, List]):
        self.params = params

    def _get_initializer(self) -> tf.keras.initializers.Initializer:
        return tf.keras.initializers.GlorotUniform(seed=self.seed)

    def _preprocess_input(self, ds: tf.data.TFRecordDataset) -> None:
        if self.model_id.startswith('ResNet'):
            return ds.map(lambda x, y: (tf.keras.applications.resnet.preprocess_input(x), y))
        elif self.model_id.startswith('Xception'):
            return ds.map(lambda x, y: (tf.keras.applications.xception.preprocess_input(x), y))
        elif self.model_id.startswith('VGG'):
            return ds.map(lambda x, y: (tf.keras.applications.vgg16.preprocess_input(x), y))
        elif self.model_id.startswith('Inception'):
            return ds.map(lambda x, y: (tf.keras.applications.inception_v3.preprocess_input(x), y))
        else:
            raise ValueError('Error !!! Invalid \'model_id\' parameter, must be {ResNet, Xception, VGG, Inception} ...')

    def _split_binary_dataset(self, dataset: tf.data.TFRecordDataset, shuffle: bool,
                              train_ratio: float = 0.70, val_ratio: float = 0.15, verbose: bool = False) \
            -> Tuple[tf.data.TFRecordDataset, tf.data.TFRecordDataset, tf.data.TFRecordDataset]:

        positive_size = 231
        negative_size = self.dataset_size - positive_size
        pos_dataset = dataset.filter(lambda x, y: y[0, 0] == 1)
        neg_dataset = dataset.filter(lambda x, y: y[0, 0] == 0)
        if shuffle:
            pos_dataset = pos_dataset.shuffle(buffer_size=250, seed=self.seed, reshuffle_each_iteration=False)
            neg_dataset = neg_dataset.shuffle(buffer_size=600, seed=self.seed, reshuffle_each_iteration=False)

        pos_train_dataset = pos_dataset.take(int(train_ratio * positive_size))
        neg_train_dataset = neg_dataset.take(int(train_ratio * negative_size))
        train_dataset = pos_train_dataset.concatenate(neg_train_dataset)
        del pos_train_dataset
        del neg_train_dataset
        pos_val_dataset = pos_dataset.skip(int(train_ratio * positive_size)).take(int(val_ratio * positive_size))
        neg_val_dataset = neg_dataset.skip(int(train_ratio * negative_size)).take(int(val_ratio * negative_size))
        val_dataset = pos_val_dataset.concatenate(neg_val_dataset)
        del pos_val_dataset
        del neg_val_dataset
        pos_test_dataset = pos_dataset.skip(int(train_ratio * positive_size)).skip(int(val_ratio * positive_size))
        neg_test_dataset = neg_dataset.skip(int(train_ratio * negative_size)).skip(int(val_ratio * negative_size))
        test_dataset = pos_test_dataset.concatenate(neg_test_dataset)
        del pos_test_dataset
        del neg_test_dataset
        del pos_dataset
        del neg_dataset
        if shuffle:
            train_dataset = train_dataset.shuffle(buffer_size=600, seed=self.seed, reshuffle_each_iteration=False)
        # return train_dataset, val_dataset, test_dataset
        return self._preprocess_input(train_dataset), self._preprocess_input(val_dataset), self._preprocess_input(test_dataset)

    def _create_base_model(self, model_id) -> tf.keras.Model:
        if model_id == 'ResNet':
            base_model = tf.keras.applications.ResNet50(include_top=False, input_shape=(128, 128, 3), pooling='avg')
        elif model_id == 'Xception':
            base_model = tf.keras.applications.Xception(include_top=False, input_shape=(128, 128, 3), pooling='avg')
        elif model_id == 'VGG':
            base_model = tf.keras.applications.VGG16(include_top=False, input_shape=(128, 128, 3), pooling='avg')
        elif model_id == 'Inception':
            base_model = tf.keras.applications.InceptionV3(include_top=False, input_shape=(128, 128, 3), pooling='avg')
        else:
            raise ValueError('Error !!! Invalid \'model_id\' parameter, must be {ResNet, Xception, VGG, Inception} ...')
        base_model.trainable = False
        return base_model

    def _create_tl_model(self) -> tf.keras.Model:
        model_name = self.model_id
        base_model = self._create_base_model(self.model_id)
        x = tf.keras.layers.Normalization()(base_model.output)
        for i in range(len(self.params['layer_sizes'][0])):
            if i == -1:
            # if i == 0:
                x = tf.keras.layers.Dense(
                    units=self.params['layer_sizes'][0][i],
                    activation='relu',
                    kernel_initializer=self._get_initializer()
                )(base_model.output)
            else:
                x = tf.keras.layers.Dense(
                    units=self.params['layer_sizes'][0][i],
                    activation='relu',
                    kernel_initializer=self._get_initializer()
                )(x)
            x = tf.keras.layers.Dropout(0.3, seed=self.seed)(x)
        output_layer = tf.keras.layers.Dense(
            units=1, activation='sigmoid', kernel_initializer=self._get_initializer()
        )(x)
        m = tf.keras.Model(base_model.input, output_layer, name=model_name)
        return m

    def _create_model(self) -> tf.keras.Model:
        if self.model_id == 'custom':
            return self._create_custom_model()
        elif self.model_id[-4:] == 'base':
            m = self._create_base_model(self.model_id[:-5])
            m._name = self.model_id
            return m
        else:
            return self._create_tl_model()

    def _compile_model(self, m: tf.keras.Model) -> tf.keras.Model:
        initial_learning_rate = self.params['learning_rate'][0]  # 0.01
        m.compile(
            loss='binary_crossentropy',
            optimizer=tf.keras.optimizers.Adam(learning_rate=initial_learning_rate),
            metrics=[tf.keras.metrics.BinaryAccuracy(name='Acc'), tf.keras.metrics.Recall(name='Sens'),
                    #  tfa.metrics.F1Score(name='F1S'), tfa.metrics.MatthewsCorrelationCoefficient(name='MCC'),
                     tf.keras.metrics.TruePositives(name='TP'), tf.keras.metrics.TrueNegatives(name='TN'),
                     tf.keras.metrics.FalsePositives(name='FP'), tf.keras.metrics.FalseNegatives(name='FN'),
                     tf.keras.metrics.AUC()]
        )
        return m

    def _build_model(self) -> tf.keras.Model:
        m = self._create_model()
        m = self._compile_model(m)
        return m

    def train_model(self, verbose=False) -> None:
        self.trained = False
        # early_stopping_cb = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=5)
        m = self._build_model()
        if verbose:
            m.summary()
        m.fit(
            x=self.train_dataset,
            # y=y_train,
            validation_data=self.val_dataset,
            verbose=1,
            class_weight={1: 231/self.dataset_size, 0: (self.dataset_size - 231)/self.dataset_size},  # TODO
            epochs=self.epochs,
            # validation_batch_size=1,
            # batch_size=1,
            # shuffle=False,
            # callbacks=[early_stopping_cb],
            # max_queue_size=2,
            # steps_per_epoch=self.train_size,
            workers=20,
            use_multiprocessing=False
        )
        self.estimator = deepcopy(m)
        self.trained = True

    def _print_evaluation_header(self) -> None:
        print('\t'.join(('layer_size', 'learning_rate' 'Accuracy', 'Sensitivity', 'TP', 'TN', 'FP', 'FN')))

    def _print_blind_evaluation_header(self) -> None:
        print(
            '\t'.join(
                (
                    'layer_size', 'learning_rate' 'Accuracy_blind', 'Sensitivity_blind',
                    'TP_blind', 'TN_blind', 'FP_blind', 'FN_blind'
                )
            )
        )

    def evaluate_model(self, verbose: int = 0) -> List[float]:
        if not self.trained:
            raise ValueError("Error !!! Model not trained ...")
        eval_train = self.estimator.evaluate(self.train_dataset, verbose=verbose)
        eval_val = self.estimator.evaluate(self.val_dataset, verbose=verbose)
        eval_test = self.estimator.evaluate(self.test_dataset, verbose=verbose)
        self._print_evaluation_header()
        print(
            str(self.params['layer_sizes'][0]) + '\t' +
            str(self.params['learning_rate'][0]) + '\t' +
            '\t'.join(map(str, eval_train + eval_val + eval_test))
        )
        return eval_train + eval_val + eval_test

    def evaluate_model_blind_data(self, b_data: tf.data.TFRecordDataset, verbose: int = 0) -> List[float]:
        if not self.trained:
            raise ValueError("Error !!! Model not trained ...")
        result = self.estimator.evaluate(b_data, verbose=verbose)
        self._print_blind_evaluation_header()
        print(
            str(self.params['layer_sizes'][0]) + '\t' +
            str(self.params['learning_rate'][0]) + '\t' +
            '\t'.join(map(str, result))
        )
        return result

    def predict_model(self, verbose: int = 0) -> Tuple[np.array, np.array, np.array]:
        if not self.trained:
            raise ValueError("Error !!! Model not trained ...")
        pred_train = self.estimator.predict(self.train_dataset, verbose=verbose)
        pred_val = self.estimator.pridict(self.val_dataset, verbose=verbose)
        pred_test = self.estimator.predict(self.test_dataset, verbose=verbose)
        return pred_train, pred_val, pred_test

    def predict_model_blind_data(self, b_data: tf.data.TFRecordDataset, verbose: int = bool) -> np.array:
        if not self.trained:
            raise ValueError("Error !!! Model not trained ...")
        pred_blind = self.estimator.predict(b_data, verbose=verbose)
        return pred_blind

## **Scripts**

In [8]:
check_gpu()

Found GPU at: /device:GPU:0


### **Dataset**

In [17]:
tfr_file_names = sorted(
    [tfr_prefix_path + f for f in listdir(tfr_prefix_path) if isfile(tfr_prefix_path + f) and f[-9:] == '.tfrecord']
)
print(tfr_file_names)

['/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-10_s0_0.tfrecord', '/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-10_s0_1.tfrecord', '/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-10_s0_2.tfrecord', '/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-10_s0_3.tfrecord', '/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-10_s0_4.tfrecord', '/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-10_s0_5.tfrecord', '/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-10_s0_6.tfrecord', '/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-10_s0_7.tfrecord', '/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-20_s0_0.tfrecord', '/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-20_s0_1.tfrecord', '/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-20_s0_2.tfrecord', '/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-20_s0_3.tfrecord', '/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-20_s0_4.tfrecord', '/content/drive/MyDrive/Objective 3/TFR_all/tfr_r-20_s0_5.tfrecord', '/content/drive/MyDrive/Objective

In [19]:
batch_size = 1
# with device('/GPU:0'):
# dataset = get_dataset(tfr_file_names, batch_size=batch_size, shuffle=False, deterministic=True, parser=parse_example_classification)
# verbose_dataset(dataset, 'volume', 'y')
dataset = get_dataset(tfr_file_names, batch_size=batch_size, shuffle=False, deterministic=True, parser=parse_matrices_classification)
# verbose_dataset(dataset, 0, 1)

### **Feature extraction**

In [28]:
model = Model('Xception_base', dataset, batch_size=batch_size, dataset_size=ceil(819/batch_size), epochs=2, shuffle=False)
# model = Model('Inception_base', dataset, batch_size=batch_size, dataset_size=ceil(819/batch_size), epochs=2, shuffle=False)
# print('Training dataset')
# verbose_dataset(model.train_dataset, 0, 1)
# print('Validation dataset')
# verbose_dataset(model.val_dataset, 0, 1)
# print('Testing dataset')
# verbose_dataset(model.test_dataset, 0, 1)

In [29]:
model.set_params({'learning_rate': [0.01], 'layer_sizes': [(64,)]})
model.estimator = model._build_model()
model.trained = True
model.estimator.summary()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/xception/xception_weights_tf_dim_ordering_tf_kernels_notop.h5
Model: "Xception_base"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 128, 128, 3)]        0         []                            
                                                                                                  
 block1_conv1 (Conv2D)       (None, 63, 63, 32)           864       ['input_2[0][0]']             
                                                                                                  
 block1_conv1_bn (BatchNorm  (None, 63, 63, 32)           128       ['block1_conv1[0][0]']        
 alization)                                                                                       
                                        

In [None]:
ds = get_dataset(tfr_file_names, batch_size=batch_size, shuffle=False, deterministic=True, parser=parse_matrices_classification)
ds = ds.map(lambda x, y: (tf.keras.applications.xception.preprocess_input(x), y))
# ds = ds.map(lambda x, y: (tf.keras.applications.inception_v3.preprocess_input(x), y))
y_pred = model.predict_model_blind_data(ds, verbose=1)
print(y_pred.shape)
# # print(y_pred[0, :])
# # print(y_pred[1, :])

### **Save extracted features (bottleneck vectors)**

In [None]:
f = open(root_folder + 'Colab Notebooks/objective-3/foo.tsv', 'w')
for i in range(y_pred.shape[0]):
    f.write('\t'.join(map(str, list(y_pred[i, :]))) + '\n')
f.close()

In [None]:
drive.flush_and_unmount()