In [4]:
import tensorflow as tf
from tensorflow.keras import layers

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder, MinMaxScaler

import pandas as pd
import numpy as np

In [5]:
label_encoder = LabelEncoder()
min_max_scaler = MinMaxScaler()

In [6]:
X = pd.read_csv(r'../data/X_expr.csv').drop(['Unnamed: 0', 'seqLibID'], axis=1).values
y = pd.read_csv(r'../data/y_cog.csv').drop(['Unnamed: 0', 'seqLibID'], axis=1).values
y = label_encoder.fit_transform(y.ravel())  

In [7]:
label_encoder.classes_

array(['AD', 'MildCognitiveImpairment', 'NoCognitiveImpairment'],
      dtype=object)

In [8]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42, stratify=y)

X_train = min_max_scaler.fit_transform(X_train, y_train)
X_test = min_max_scaler.transform(X_test)

train_ds = tf.data.Dataset.from_tensor_slices(
    (X_train, y_train)
).shuffle(10000).batch(100)

test_ds = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(32)

#### Encoder and decoder definitions

In [9]:
def get_cnn_encoder(input_shape, filters, kernel_sizes, bottleneck_size,
                    dnn_layer_sizes=None, strides=None, paddings="same", activation='relu'):
    if strides is None:
        strides = [(1, 1) for _ in range(len(filters))]

    input_layer = layers.Input(shape=input_shape)

    x = layers.Conv2D(filters[0], kernel_size=kernel_sizes[0], strides=strides[0], padding=paddings)(input_layer)

    for i in range(1, len(filters)):
        x = layers.Conv2D(filters[i], kernel_size=kernel_sizes[i], strides=strides[i], padding=paddings)(x)
        x = layers.BatchNormalization()(x)
        x = layers.LeakyReLU()(x)

    x = layers.Flatten(name='flatten')(x)

    if dnn_layer_sizes is not None:
        for n_nodes in dnn_layer_sizes:
            x = layers.Dense(n_nodes, activation=activation)(x)

    x = layers.Dense(bottleneck_size, activation=activation)(x)

    model = tf.keras.Model(inputs=input_layer, outputs=x)

    flatten_idx = 1
    for i, layer in enumerate(model.layers):
        if layer.name == 'flatten':
            flatten_idx = i
    pre_flatten_idx = flatten_idx - 1

    pre_flatten_dim = model.layers[pre_flatten_idx].output_shape
    n_flatten_nodes = model.layers[flatten_idx].output_shape

    return model, pre_flatten_dim, n_flatten_nodes


def get_cnn_decoder(bottleneck_size, pre_flatten_dim, n_flatten_nodes, filters, kernel_sizes, 
                    dnn_layer_sizes=None, strides=None, paddings="same", activation='relu'):
    if pre_flatten_dim[-1] != filters[0]:
        raise ValueError("filter sizes do not match encoder dimensions")
   
    if strides is None:
        strides = [(1, 1) for _ in range(len(filters))]

    input_layer = layers.Input(shape=(bottleneck_size))

    if dnn_layer_sizes is not None:
        x = layers.Dense(dnn_layer_sizes[0], activation=activation)(input_layer)
        for n_nodes in dnn_layer_sizes[1:]:
            x = layers.Dense(n_nodes, activation=activation)(x)
        x = layers.Dense(n_flatten_nodes)(x)
    else:
        x = layers.Dense(n_flatten_nodes)(input_layer)
    
    x = layers.Reshape(pre_flatten_dim)(x)

    new_filters = filters[1:] + [1]
    for i in range(len(new_filters) - 1):
        x = layers.Conv2DTranspose(new_filters[i], kernel_size=kernel_sizes[i], strides=strides[i], padding=paddings)(x)
        x = layers.BatchNormalization()(x)
        x = layers.LeakyReLU()(x)
    x = layers.Conv2DTranspose(new_filters[-1], kernel_size=kernel_sizes[-1], strides=strides[-1], padding=paddings, 
                               activation=tf.keras.activations.sigmoid)(x)

    model = tf.keras.Model(inputs=input_layer, outputs=x)

    return model

#### Encoder and decoder initialization

In [10]:
bottleneck_size = 10

filters = [32, 64, 64, 64]
kernel_sizes = [3, 3, 3, 3]
strides = [1, 2, 2, 1]

filters_r = list(reversed(filters))
kernel_sizes_r = list(reversed(kernel_sizes))
strides_r = list(reversed(strides))

In [11]:
enc, pre_flatten_dim, n_flatten_nodes = get_cnn_encoder(
    input_shape=(28, 28, 1), 
    bottleneck_size=bottleneck_size,
    filters=filters, 
    kernel_sizes=kernel_sizes, 
    strides=strides
)

dec = get_cnn_decoder(
    bottleneck_size=bottleneck_size, 
    pre_flatten_dim=pre_flatten_dim[1:], 
    n_flatten_nodes=n_flatten_nodes[1],
    filters=filters_r,
    kernel_sizes=kernel_sizes_r,
    strides=strides_r
)

optimizer = tf.keras.optimizers.Adam()

#### Train autoencoder

In [12]:
loss_object = tf.keras.losses.CategoricalCrossentropy(from_logits=False)

train_loss = tf.keras.metrics.Mean(name='train_loss')
test_loss = tf.keras.metrics.Mean(name='test_loss')

@tf.function
def train_step_cnn_autoencoder(images):
 
    with tf.GradientTape() as tape_encoder, tf.GradientTape() as tape_decoder:
       
        latent = enc(images, training=True)
        generated_images = dec(latent, training=True)
        loss = loss_object(images, generated_images)
         
    gradients_of_enc = tape_encoder.gradient(loss, enc.trainable_variables)
    gradients_of_dec = tape_decoder.gradient(loss, dec.trainable_variables)
    
    optimizer.apply_gradients(zip(gradients_of_enc, enc.trainable_variables))
    optimizer.apply_gradients(zip(gradients_of_dec, dec.trainable_variables))

    train_loss(loss)


@tf.function
def test_step_cnn_autoencoder(images):
    predictions = dec(enc(images, training=False), training=False)
    t_loss = loss_object(images, predictions)

    test_loss(t_loss)

In [None]:
EPOCHS = 10
for epoch in range(EPOCHS):
    train_loss.reset_states()
    test_loss.reset_states()

    for data, labels in train_ds:
        train_step_cnn_autoencoder(data)
        print(f'epoch: {epoch + 1} | train loss: {train_loss.result()}', end='\r')
    print()

    for data, test_labels in test_ds:
        test_step_cnn_autoencoder(data)
        print(f'epoch: {epoch + 1} | test loss: {test_loss.result()}', end='\r')
    print('\n')

#### Classifier definition

In [15]:
def get_nn_decoder(latent_shape, layer_sizes, dropout_rates=None, activation='relu'):
    # dense, dropout, dense, dropout, dense

    input_layer = layers.Input(shape=latent_shape)
    x = layers.Dense(layer_sizes[0], activation=activation)(input_layer)

    for i, n_nodes in enumerate(layer_sizes[1: ], 1):
        if dropout_rates is not None:
            x = layers.Dropout(dropout_rates[i - 1])(x)

        if i == len(layer_sizes) - 1:
            x = layers.Dense(n_nodes, activation=tf.keras.activations.softmax)(x)
        else:
            x = layers.Dense(n_nodes, activation=activation)(x)  

    model = tf.keras.Model(inputs=input_layer, outputs=x)
    return model

#### Classifier initialization

In [16]:
classifier = get_nn_decoder(bottleneck_size, [500, 500, 100, 3])

#### Train classifier

In [17]:
loss_object_sparse = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy_sparse = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy_sparse = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

@tf.function
def train_step_nn(data, labels):
    with tf.GradientTape() as tape:
        latent = enc(data, training=True)
        predictions = classifier(latent, training=True)
        loss = loss_object_sparse(labels, predictions)
    gradients = tape.gradient(loss, classifier.trainable_variables)
    optimizer.apply_gradients(zip(gradients, classifier.trainable_variables))

    train_loss(loss)
    train_accuracy_sparse(labels, predictions)


@tf.function
def test_step_nn(data, labels):
    latent = enc(data, training=False)
    predictions = classifier(data, training=False)
    t_loss = loss_object_sparse(labels, predictions)

    test_loss(t_loss)
    test_accuracy_sparse(labels, predictions)

In [None]:
EPOCHS = 10

for epoch in range(EPOCHS):
    # Reset the metrics at the start of the next epoch
    train_loss.reset_states()
    train_accuracy_sparse.reset_states()
    test_loss.reset_states()
    test_accuracy_sparse.reset_states()

    for data, labels in train_ds:
        encoded_data = enc(data)
        train_step_nn(encoded_data, labels)

    for test_data, test_labels in test_ds:
        encoded_test_data = enc(test_data)
        test_step_nn(encoded_test_data, test_labels)
