In [1]:
import numpy as np
import json
import warnings
import tensorflow as tf
from tensorflow.keras import models, Model
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import Dense, Conv2D, Flatten, MaxPool2D, BatchNormalization, Dropout, Input, Concatenate, GlobalAvgPool2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import Sequence
import json
from evaluator import *

In [2]:
VARIABLES = ['ActivityCounts', 'Barometer', 'BloodPerfusion',
             'BloodPulseWave', 'EnergyExpenditure', 'GalvanicSkinResponse', 'HR',
             'HRV', 'RESP', 'Steps', 'SkinTemperature', 'ActivityClass']

In [3]:
GRAYSCALE = False # grayscale or rgb # TODO: currently not working (MobileNetV2 does not support grayscale)

# Import data

In [4]:
# file path to data folder
path = './Output'

In [5]:
# dimensions
N, HEIGHT, WIDTH, CHANNELS = sum([1 for p in os.listdir(path) if (p[:14] == 'feature_vector' and p[:19] != 'feature_vector_stat')]), \
                             *np.load(path + '/feature_vector0.npy').shape
CHANNELS = len(VARIABLES) if GRAYSCALE else CHANNELS # reduce channels for grayscale

print(N, HEIGHT, WIDTH, CHANNELS)

613 370 497 30


Metadata (subjectID etc.)

In [6]:
with open(path + '/metadata.txt') as f:
    metadata = f.read()

metadata = json.loads(metadata.replace('\'', '\"').replace('False', 'false').replace('True', 'true')) # doesn't accept other chars

In [7]:
subjects = [meta['subjectID'] for meta in metadata]

# CNN

Addditional functions

In [8]:
# image-wise transformer
def rgb2gray(rgb):
    """greyscale = 0.2989 * red + 0.5870 * green + 0.1140 * blue"""
    return np.dot(rgb[:, :, :3], [0.2989, 0.5870, 0.1140])

# loss function
def weighted_cross_entropy(weight):
    def weighted_cross_entropy_with_logits(labels, logits):
        loss = tf.nn.weighted_cross_entropy_with_logits(
            labels, logits, weight
        )
        return loss
    return weighted_cross_entropy_with_logits

# weight (imbalanced classes)
def check_imbalance(path_to_labels, indices, variable):
    """Returns indices of positives/negatives"""
    y = np.empty((len(indices), 2), dtype=int)
    for i, index in enumerate(indices):
        y[i, ] = np.load(path_to_labels + f'/labels{index}.npy', allow_pickle=True)

    positives = np.where(y[:, variable] == 1)[0]
    negatives = np.where(y[:, variable] == 0)[0]

    return np.array(indices)[positives], np.array(indices)[negatives]

def get_weighting_factor(path, train_set_indices, variable):
    positives, negatives = check_imbalance(path, train_set_indices, variable)
    sample_weight = len(negatives) / len(positives) # for weighted cross-entropy
    return sample_weight

Dataloader (dataset with images too large)

In [9]:
class DataGenerator(Sequence):

    def __init__(self, data_path: str, indices_dataset: list, variable, batch_size=32, dim=(HEIGHT, WIDTH), n_channels=CHANNELS, shuffle=True):
        self.data_path = data_path # path to full dataset
        self.dim = dim # image dimension
        self.batch_size = batch_size
        self.indices_dataset = indices_dataset # indices of full dataset (different for train/validation/test set)
        self.n_channels = n_channels
        self.shuffle = shuffle
        assert variable in (0, 1)
        self.variable = variable

        self.on_epoch_end() # shuffle data for each epoch

    def on_epoch_end(self):
        """
        Shuffle data for each epoch
        """
        if self.shuffle:
            np.random.shuffle(self.indices_dataset)

    def __data_generation(self, indices):
        """
        Loads and returns datapoints[indices]
        """
        # init
        X = np.empty((self.batch_size, *self.dim, self.n_channels))
        y = np.empty(self.batch_size, dtype=float) # float: logits, int: non-logits

        # load individual datapoints
        for i, index in enumerate(indices):
            images = np.load(self.data_path + f'/feature_vector{index}.npy', allow_pickle=True)
            if GRAYSCALE:
                images_gray = np.empty((HEIGHT, WIDTH, self.n_channels))
                for j in range(len(VARIABLES)):
                    image_rgb = images[:, :, (3 * j): (3 * (j + 1))]
                    image_gray = rgb2gray(image_rgb)
                    images_gray[:, :, j] = image_gray
                images = images_gray

            X[i, ] = images
            y[i] = np.load(self.data_path + f'/labels{index}.npy', allow_pickle=True)[self.variable]

        return X, y

    def __len__(self):
        """
        Number of batches per epoch
        """
        return int(np.floor(len(self.indices_dataset) / self.batch_size))

    def __getitem__(self, index):
        """
        Generates batch[index]
        """
        # calculate indices of batch
        indices = self.indices_dataset[index * self.batch_size:(index + 1) * self.batch_size]

        # generate batch
        X, y = self.__data_generation(indices)

        return X, y

Architecture

In [10]:
class ConvNet(tf.keras.Model):

    def __init__(self, name='cnn', **kwargs):
        super(ConvNet, self).__init__(name, **kwargs)

        self.in_shape = (HEIGHT, WIDTH, CHANNELS)
        self.in_shape_mobilenet = (HEIGHT, WIDTH, 3) if not GRAYSCALE else (HEIGHT, WIDTH, 1)

        # MobileNetV2 embedding
        self.mobilenet = MobileNetV2(input_shape=self.in_shape_mobilenet, weights='imagenet', include_top=False)
        self.mobilenet._name = 'mobilenet'
        self.mobilenet.trainable = False
        self.finetuning = False
        self.out_shape_mobilenet = self.mobilenet.layers[-1].output_shape # for one spectrogram

        # Concatenation
        self.concat = Concatenate(name='concat')

        # Global pooling
        self.pool = GlobalAvgPool2D(name='global_avg_pool')

        # TODO: more sophisticated dense (dropout, regularizer, init., ...)
        # Fully-connected network
        self.dense = Dense(1, name='dense') # keep logits
        self.out_shape = 1

        # build graph
        self.build_graph()

    def build_graph(self):
        self.build(input_shape=(None, *self.in_shape))
        x = Input(shape=self.in_shape)
        Model(inputs=[x], outputs=self.call(x))

    def set_finetuning(self, mode=True):
        self.finetuning = mode
        self.mobilenet.trainable = mode

        for layers in self.mobilenet.layers:
            layers.trainable = False

        # "activate" last conv layer of MobileNet
        self.mobilenet.layers[-3].trainable = mode
        self.mobilenet.layers[-2].trainable = mode

    def call(self, inputs):
        """
        Model predictions (logits)
        :param inputs: all spectrograms of shape (HEIGHT, WIDTH, CHANNELS)
        :return: class prediction (logits)
        """
        # MobileNetV2 embeddings
        x = [self.mobilenet(inputs[..., i:i+3], training=self.finetuning) for i in range(0, CHANNELS, 3)]

        # Concatenation
        x = self.concat(x)

        # Global pooling
        x = self.pool(x)

        # Fully-connected network
        x = self.dense(x)

        return x

Model

In [11]:
class CNN:

    def __init__(self, path, variable, epochs, learning_rate, batch_size):
        self.model = ConvNet()
        self.path = path
        assert variable in (0, 1)
        self.variable = variable
        self.epochs = epochs
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.history = None
        self.train_indices = None
        self.test_indices = None

    def fit(self, train_indices):
        self.train_indices = train_indices

        # training set
        train_dataloader = DataGenerator(self.path, train_indices, batch_size=self.batch_size, variable=self.variable)

        # weights for loss function
        sample_weights = get_weighting_factor(self.path, train_indices, self.variable)

        # build model
        self.model.compile(optimizer=Adam(learning_rate=self.learning_rate),
                           loss=weighted_cross_entropy(sample_weights))

        # training
        self.history = self.model.fit_generator(generator=train_dataloader,
                                                epochs=self.epochs)

    def reset(self):
        """Resets model weights"""
        self.model = ConvNet()

    def predict(self, test_indices):
        """Predicts actual class labels (not logits/probability values!)"""
        self.test_indices = test_indices

        # TODO: make more efficient
        # test set + predict
        y_pred = np.empty(len(test_indices), dtype=float)

        for i, index in enumerate(test_indices):
            X_i = np.load(path + f'/feature_vector{index}.npy', allow_pickle=True)

            X_i = tf.expand_dims(X_i, axis=0) # add "batch dimension"
            logits_pred_i = self.model.predict(X_i)

            y_pred[i] = logits_pred_i

        y_probs = tf.math.sigmoid(y_pred) # logits to probs
        y_pred = tf.round(y_probs) # probs to labels

        return y_pred

    def summary(self):
        return self.model.summary()

# CV

In [None]:
scores_strat_group_k_fold = [None]*2
scores_loso = [None]*2

with warnings.catch_warnings():
    warnings.filterwarnings('ignore')

    for variable in (0, 1): # phF, MF
        model = CNN(path, variable=variable, epochs=20, learning_rate=1e-3, batch_size=16)

        scores_strat_group_k_fold[variable] = stratified_group_k_fold(path=path,
                                                            groups=subjects,
                                                            model=model,
                                                            folds=5,
                                                            images=True,
                                                            verbose=True,
                                                            variable=variable)

        scores_loso[variable] = leave_one_subject_out(path=path,
                                            groups=subjects,
                                            model=model,
                                            images=True,
                                            verbose=True,
                                            variable=variable)

# Save scores

In [29]:
path_scores = './Scores'
model_name = 'cnn2'

In [30]:
# stratified group 5-fold
with open(f'{path_scores}/strat_group_5_fold//{model_name}.txt', 'w') as dat:
    dat.write(str(scores_strat_group_k_fold))

In [31]:
# LOSO
with open(f'{path_scores}/loso//{model_name}.txt', 'w') as dat:
    dat.write(str(scores_loso))

# Under development

Daily majority vote

In [None]:
'''variable = 0
model = CNN(path, variable=0, epochs=10, learning_rate=1e-3, batch_size=16)

N = sum([1 for p in os.listdir(path) if (p[:14] == 'feature_vector' and p[:19] != 'feature_vector_stat')])

# load labels (we need them for stratification)
y = np.empty(N, dtype=int)
for i in range(N):
    y[i] = np.load(path + f'/labels{i}.npy', allow_pickle=True)[variable]  # TODO: multiclass

# CV
cv = StratifiedGroupKFold(n_splits=5, shuffle=True, random_state=SEED)
scores_cv = []
data_indices = np.arange(N)

print(f'Starting stratified group {5}-fold for {["physical fatigue", "mental fatigue"][variable]}')
with tqdm(total=5) as pbar:
    for i, (train_indices, test_indices) in enumerate(cv.split(X=data_indices, y=y, groups=subjects)):
        # test labels
        y_test = y[test_indices]

        # training
        model.reset()
        model.fit(train_indices)

        # predict
        y_pred = model.predict(test_indices)

        # daily majorities
        y_pred_daily = np.array(list(daily_majority_vote(y_pred, test_indices, metadata).values()))
        y_true_daily = np.array(list(daily_majority_vote(y_test, test_indices, metadata).values()))

        # evaluate
        scores = evaluator(y_pred_daily, y_true_daily, verbose=False)
        scores_cv.append(scores)

        # agreements
        print('agreements:', agreements(y_pred, model.test_indices, metadata))

        # for progress bar
        pbar.update(1)
        pbar.set_description(f' Fold {i+1} F1: {scores["f1"]}')

# print (if verbose==True)
print('Performance model:')
metrics = scores_cv[0].keys()
for metric in metrics:
    mean = np.mean([scores_cv_i[metric] for scores_cv_i in scores_cv])
    std = np.std([scores_cv_i[metric] for scores_cv_i in scores_cv])
    print(f' {metric}: {round(mean, 3)} +- {round(std, 3)} \n')'''

# TODO: check why so few days (even fewer than statistical features)