In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, GRU, Bidirectional, GlobalAveragePooling1D, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.metrics import f1_score
import statistics

In [None]:
#Extract audio and lexcial features as explained in the multimodal target model
x_audio_data = np.array([i[0] for i in df_list])  # Audio features
x_text_data = np.array([i[1] for i in df_list])   # Text features
y_data = np.array([i[2] for i in df_list])        # Depression labels (binary: 0 for control, 1 for depressed)
unique_clip_ID_No = np.array([i[3] for i in df_list])  # Unique clip identifier
session_number = np.array([i[4] for i in df_list])     # Session number

## Target model - lexical

In [None]:
# Setting up parameters and data storage lists
num_folds = 5
speaker_ids = np.unique(session_number)
k_fold = KFold(n_splits=num_folds, shuffle=True)
training_index, testing_index, validation_index = [], [], []
accuracies_best, loss_fold_best, F1_best, f1_MV_best_, accuracies_MV_best, session_list = [], [], [], [], [], []

# Define a custom early stopping to restore the best weights
class CustomEarlyStopping(tf.keras.callbacks.Callback):
    def __init__(self, patience=0, restore_path=''):
        super(CustomEarlyStopping, self).__init__()
        self.patience = patience
        self.restore_path = restore_path
        self.best_weights = None

    def on_train_begin(self, logs=None):
        # Reset the wait counter.
        self.wait = 0
        self.stopped_epoch = 0
        self.best = np.Inf

    def on_epoch_end(self, epoch, logs=None):
        current_loss = logs.get('val_loss')
        if np.less(current_loss, self.best):
            self.best = current_loss
            self.wait = 0
            # Record the best weights if current results is better (less).
            self.best_weights = self.model.get_weights()
        else:
            self.wait += 1
            if self.wait >= self.patience:
                self.stopped_epoch = epoch
                self.model.stop_training = True
                print('Restoring model weights from the end of the best epoch.')
                self.model.load_weights(self.restore_path)

# Split data into folds for cross-validation
for fold_indexes_training, fold_indexes_test in k_fold.split(speaker_ids):
    fold_testing = speaker_ids[fold_indexes_test]
    fold_training = speaker_ids[fold_indexes_training]
    testing_index.append(np.where(np.isin(session_number, fold_testing)))
    training_index.append(np.where(np.isin(session_number, fold_training)))

# Model training and evaluation
for i in range(num_folds):
    print(f'Fold number: {i + 1}')
    x_train_text = x_text_data[training_index[i][0]]
    y_train = y_data[training_index[i][0]]
    x_test_text = x_text_data[testing_index[i][0]]
    y_test = y_data[testing_index[i][0]]
    session_no = np.unique(session_number[testing_index[i][0]])
    print(f'Test session for this fold: {session_no}')
    session_list.append(session_no)

    # Model definition
    n_gru_layers = 2
    gru_layer_width = 252
    lexical_input = Input(shape=(333, 768), name="lexical_input")
    x_l = lexical_input
    for _ in range(n_gru_layers):
        x_l = Bidirectional(GRU(gru_layer_width, return_sequences=True))(x_l)

    x_l = GlobalAveragePooling1D()(x_l)
    dense1 = Dense(128, activation='relu')(x_l)
    output = Dense(2, activation='softmax')(dense1)
    model = Model(inputs=lexical_input, outputs=output)
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    # Callbacks
    early_stopping_custom = CustomEarlyStopping(patience=3)

    # Fit model
    history = model.fit(x_train_text, y_train, validation_split=0.2, epochs=200, batch_size=64, callbacks=[early_stopping_custom])
    loss, accuracy = model.evaluate(x_test_text, y_test)
    print(f"Test Loss: {loss}")
    print(f"Test Accuracy: {accuracy}")
    accuracies_best.append(accuracy)
    loss_fold_best.append(loss)

    # Predictions and F1 score
    y_test_pred_best = [np.argmax(model.predict(np.array([x]), verbose=0)[0]) for x in x_test_text]
    F1_score = f1_score(y_test, y_test_pred_best)
    print(f"F1 Score: {F1_score}")
    F1_best.append(F1_score)

    # Majority vote accuracy
    acc_MV_best, f1_MV_best = majority_vote(y_test, y_test_pred_best, unique_clip_ID_No[testing_index[i][0]])
    print(f'Accuracy of majority vote: {acc_MV_best}')
    print(f'F1 score of majority vote: {f1_MV_best}')
    accuracies_MV_best.append(acc_MV_best)
    f1_MV_best_.append(f1_MV_best)

# Summary statistics
print('--- Summary of Results ---')
for test_set, loss, acc, f1 in zip(session_list, loss_fold_best, accuracies_best, F1_best):
    print(f'Test set: {test_set}, Loss: {loss:.4f}, Accuracy: {acc:.4f}, F1 Score: {f1:.4f}')
print(f'Average accuracy per 10 sec chunk: {np.mean(accuracies_best):.4f}, Std Dev: {statistics.stdev(accuracies_best):.4f}')
print(f'Average F1 score per 10 sec chunk: {np.mean(F1_best):.4f}, Std Dev: {statistics.stdev(F1_best):.4f}')
print(f'Average accuracy per clip: {np.mean(accuracies_MV_best):.4f}, Std Dev: {statistics.stdev(accuracies_MV_best):.4f}')
print(f'Average F1 score per clip: {np.mean(f1_MV_best_):.4f}, Std Dev: {statistics.stdev(f1_MV_best_):.4f}')
