This file contains code for training advance models such as VGG19, Resnet-50 on mel spectograms and fine tuning Distil hubert on raw audio signals. Moreover, MFCC features are also extracted and used to train VGG19. In addition to that transformer model is also trained.

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.applications import VGG16
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [2]:
mode = "kaggle"

if mode == "local":
    features_dir = "../../extracted_features"
    info_dir = "../../dataset_info"

if mode == "kaggle":
    features_dir = "/kaggle/input/daic-woz/spectograms"
    info_dir = "/kaggle/input/daic-woz/info/"
    output_dir = "/kaggle/working"

DATASET_DIR = features_dir + "/spect_images"
DATAINFO_DIR = info_dir

# Training on Mel Spectograms

## Dataset Loading

In [3]:
def create_dataframe(csv_path, img_dir):
    df = pd.read_csv(csv_path)
    df["filepath"] = df["Participant_ID"].apply(lambda x: os.path.join(img_dir, f"{x.split('.')[0]}.png"))
    df[df["filepath"].apply(lambda x: os.path.exists(x))]
    df["PHQ_Binary"] = df["PHQ_Binary"].astype(str)
    return df[["filepath", "PHQ_Binary"]]

In [4]:
train_df = create_dataframe(f"{DATAINFO_DIR}/train_split_new.csv", f"{DATASET_DIR}/train")
dev_df = create_dataframe(f"{DATAINFO_DIR}/dev_split_new.csv" , f"{DATASET_DIR}/dev")
test_df = create_dataframe(f"{DATAINFO_DIR}/test_split_new.csv", f"{DATASET_DIR}/test")

In [5]:
train_df.shape, dev_df.shape, test_df.shape

((274, 2), (56, 2), (56, 2))

In [6]:
BATCH_SIZE = 32

In [7]:
def load_image(image_path, label):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_png(image, channels=3)
    image = tf.image.resize(image, [512, 512])
    return image, label

def create_tf_dataset(df, is_test=False):
    filepaths = df['filepath'].values
    labels = df['PHQ_Binary'].values.astype(int)
    dataset = tf.data.Dataset.from_tensor_slices((filepaths, labels))
    dataset = dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    
    if not is_test:
        dataset = dataset.shuffle(1024).repeat()  # Only shuffle and repeat for train/validation sets

    dataset = dataset.batch(BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE)
    return dataset


In [8]:
train_dataset = create_tf_dataset(train_df)
dev_dataset = create_tf_dataset(dev_df)
test_dataset = create_tf_dataset(test_df, is_test=True)

In [9]:
# Fetch one sample from train_dataset
for image, label in train_dataset.take(1): # retireve one batch of data
    sample_image = image[0].numpy()
    sample_label = label[0].numpy()
    print("Sample Image Shape:", sample_image.shape)
    print("Sample Label:", sample_label)
    break

Sample Image Shape: (512, 512, 3)
Sample Label: 1


In [10]:
num_train_samples = train_df.shape[0]  # Total number of samples in the training set
num_val_samples = dev_df.shape[0] # Total number of samples in the validation set

# Calculate steps per epoch
train_steps = num_train_samples // BATCH_SIZE
val_steps = num_val_samples // BATCH_SIZE

print(f"Train steps per epoch: {train_steps}, Validation steps per epoch: {val_steps}")

Train steps per epoch: 8, Validation steps per epoch: 1


## VGG19

In [11]:
from tensorflow.keras.applications import VGG19
from tensorflow.keras import layers, models, optimizers

# Load the pretrained model, excluding the top layers
base_model = VGG19(weights='imagenet', include_top=False, input_shape=(512, 512, 3))

# Freeze all base layers
for layer in base_model.layers:
    layer.trainable = False

# Add custom layers on top
x = base_model.output
x = layers.Flatten()(x)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
x = layers.Dense(512, activation='relu')(x)
x = layers.Dropout(0.5)(x)
predictions = layers.Dense(1, activation='sigmoid')(x)

# Create the final model
model = models.Model(inputs=base_model.input, outputs=predictions)

# Compile the model
model.compile(optimizer=optimizers.Adam(learning_rate=1e-4), loss='binary_crossentropy', metrics=['accuracy'])

# Train only the new top layers
history = model.fit(train_dataset, validation_data=dev_dataset, epochs=15, steps_per_epoch=train_steps, validation_steps=val_steps)

# Now, unfreeze some layers for fine-tuning
for layer in base_model.layers[-5:]:  # Adjust range as needed
    layer.trainable = True

# Recompile to apply changes
model.compile(optimizer=optimizers.Adam(learning_rate=1e-5), loss='binary_crossentropy', metrics=['accuracy'])

# Fine-tune the model
history_fine = model.fit(train_dataset, validation_data=dev_dataset, epochs=5, steps_per_epoch=train_steps, validation_steps=val_steps)


Epoch 1/15


I0000 00:00:1730748485.065548    1481 service.cc:145] XLA service 0x7833d8006010 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1730748485.065600    1481 service.cc:153]   StreamExecutor device (0): Tesla P100-PCIE-16GB, Compute Capability 6.0
2024-11-04 19:28:15.734218: E external/local_xla/xla/service/slow_operation_alarm.cc:65] Trying algorithm eng11{k2=1,k3=0} for conv (f32[32,64,512,512]{3,2,1,0}, u8[0]{0}) custom-call(f32[32,64,512,512]{3,2,1,0}, f32[64,64,3,3]{3,2,1,0}, f32[64]{0}), window={size=3x3 pad=1_1x1_1}, dim_labels=bf01_oi01->bf01, custom_call_target="__cudnn$convBiasActivationForward", backend_config={"operation_queue_id":"0","wait_on_operation_queues":[],"cudnn_conv_backend_config":{"conv_result_scale":1,"activation_mode":"kRelu","side_input_scale":0,"leakyrelu_alpha":0}} is taking a while...
2024-11-04 19:28:15.847257: E external/local_xla/xla/service/slow_operation_alarm.cc:133] The operation took 1.113202936s
Try

[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m46s[0m 626ms/step - accuracy: 0.4928 - loss: 2.0173 - val_accuracy: 0.2188 - val_loss: 2.5033
Epoch 2/15
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 442ms/step - accuracy: 0.5142 - loss: 2.2036 - val_accuracy: 0.8125 - val_loss: 0.5732
Epoch 3/15
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 473ms/step - accuracy: 0.6385 - loss: 0.9175 - val_accuracy: 0.7500 - val_loss: 0.5983
Epoch 4/15
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 440ms/step - accuracy: 0.6309 - loss: 0.8377 - val_accuracy: 0.5312 - val_loss: 0.7587
Epoch 5/15
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 436ms/step - accuracy: 0.6932 - loss: 0.6027 - val_accuracy: 0.8750 - val_loss: 0.4870
Epoch 6/15
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 441ms/step - accuracy: 0.7987 - loss: 0.4142 - val_accuracy: 0.9062 - val_loss: 0.4397
Epoch 7/15
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[

In [12]:
# Evaluate the model on the test dataset
test_loss, test_accuracy = model.evaluate(test_dataset)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m27s[0m 26s/step - accuracy: 0.6726 - loss: 1.5191
Test Loss: 1.487822413444519
Test Accuracy: 0.6964285969734192


In [13]:
# Save the model
model.save(f"{output_dir}/fine_tuned_vgg19_audio_classification.keras")

## RESNET-50

In [92]:
from tensorflow.keras.applications import ResNet50
from tensorflow.keras import layers, models, optimizers

# Load the pretrained ResNet50 model, excluding the top layers
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(512, 512, 3))

# Freeze all layers in the base model
for layer in base_model.layers:
    layer.trainable = False

# Add custom layers on top of the base model
x = base_model.output
x = layers.Flatten()(x)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
predictions = layers.Dense(1, activation='sigmoid')(x)

# Create the final model
model = models.Model(inputs=base_model.input, outputs=predictions)

# Compile the model
model.compile(optimizer=optimizers.Adam(learning_rate=1e-4), loss='binary_crossentropy', metrics=['accuracy'])

# Train only the top layers
history = model.fit(train_dataset, validation_data=dev_dataset, epochs=20, steps_per_epoch=train_steps, validation_steps=val_steps)

# Now unfreeze some of the last layers in ResNet50 for fine-tuning
for layer in base_model.layers[-10:]:  # Adjust range as needed
    layer.trainable = True

# Recompile the model to apply changes
model.compile(optimizer=optimizers.Adam(learning_rate=1e-5), loss='binary_crossentropy', metrics=['accuracy'])

# Fine-tune the model
history_fine = model.fit(train_dataset, validation_data=dev_dataset, epochs=10, steps_per_epoch=train_steps, validation_steps=val_steps)


Epoch 1/20
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 838ms/step - accuracy: 0.4460 - loss: 15.8057 - val_accuracy: 0.2500 - val_loss: 13.6415
Epoch 2/20
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 249ms/step - accuracy: 0.5377 - loss: 9.2097 - val_accuracy: 0.3438 - val_loss: 4.4460
Epoch 3/20
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 313ms/step - accuracy: 0.6932 - loss: 3.2383 - val_accuracy: 0.8125 - val_loss: 0.6191
Epoch 4/20
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 284ms/step - accuracy: 0.7673 - loss: 1.2686 - val_accuracy: 0.5938 - val_loss: 1.4294
Epoch 5/20
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 246ms/step - accuracy: 0.8637 - loss: 0.2660 - val_accuracy: 0.5312 - val_loss: 1.1077
Epoch 6/20
[1m8/8[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 248ms/step - accuracy: 0.8073 - loss: 0.3645 - val_accuracy: 0.7188 - val_loss: 0.5270
Epoch 7/20
[1m8/8[0m [32m━━━━━━━━━

In [93]:
# Evaluate the model on the test dataset
test_loss, test_accuracy = model.evaluate(test_dataset)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step - accuracy: 0.5342 - loss: 1.1947  
Test Loss: 1.1721305847167969
Test Accuracy: 0.5357142686843872


In [94]:
# Save the model
model.save(f"{output_dir}/fine_tuned_resnet50_audio_classification.keras")

# DistilHubert

This requires raw audio signals.

In [3]:
mode = "kaggle"

if mode == "local":
    features_dir = "../../audio_chunks"
    info_dir = "../../dataset_info"
    output_dir = "./"
    
if mode == "kaggle":
    input_dir = "/kaggle/input/daic-woz-chunked/audio_chunks"
    info_dir = "/kaggle/input/daic-woz-chunked/dataset_info"
    output_dir = "/kaggle/working"
    
DATASET_DIR = input_dir
DATAINFO_DIR = info_dir
OUTPUT_DIR = output_dir

In [4]:
import os
import numpy as np
import pandas as pd

import torch
from torch.utils.data import IterableDataset

from datasets import Audio
from transformers import AutoFeatureExtractor, AutoModelForAudioClassification, TrainingArguments, Trainer

from sklearn.metrics import accuracy_score, precision_recall_fscore_support

In [None]:
MODEL_ID = "ntu-spml/distilhubert"

feature_extractor = AutoFeatureExtractor.from_pretrained(MODEL_ID, 
                                                        do_normalize=True, return_attention_mask=True)

feature_extractor.sampling_rate

In [6]:
class AudioIterabaleDataset(IterableDataset):
    def __init__(self, audio_dir, feature_extractor, labels_dict, split="train"):
        self.audio_dir = audio_dir
        self.feature_extractor = feature_extractor
        self.split = split
        self.labels_dict = labels_dict
        
        self.dataset_dict = {'audio': [], 'label': []}
        
        for filename in os.listdir(self.audio_dir):
            if filename.endswith('.wav'): 
                full_file_path = os.path.join(audio_dir, filename)
                # Get label from your labels_dict (assuming filename is the key)
                label = labels_dict.get(filename)
                
                self.dataset_dict['audio'].append(full_file_path)
                self.dataset_dict['label'].append(label)

    def __len__(self):
        return len(self.dataset_dict['audio'])
                
    def __iter__(self):
        audio_feature = Audio(sampling_rate=self.feature_extractor.sampling_rate)
        # print(audio_feature)
        
        for audio_path, label in zip(self.dataset_dict['audio'], self.dataset_dict['label']):
            audio_stream = audio_feature.decode_example(
                {"path": audio_path, "bytes": None}
            )
            
            audio_array = audio_stream["array"]
            sampling_rate = audio_stream["sampling_rate"]
            
            inputs = self.feature_extractor(audio_array, sampling_rate=sampling_rate,
                                            return_tensors="pt", return_attention_mask=True)
            inputs['input_values'] = inputs['input_values'].squeeze(0)
            if 'attention_mask' in inputs:
                inputs['attention_mask'] = inputs['attention_mask'].squeeze(0)
            inputs["labels"] = torch.tensor(label)
            
            yield inputs

In [7]:
train_metadata = pd.read_csv(f"{DATASET_DIR}/train/metadata.csv")
dev_metadata = pd.read_csv(f"{DATASET_DIR}/dev/metadata.csv")
test_metadata = pd.read_csv(f"{DATASET_DIR}/test/metadata.csv")

# convert the metadata to a dictionary
train_labels_dict = dict(zip(train_metadata['file'], train_metadata['label']))
dev_labels_dict = dict(zip(dev_metadata['file'], dev_metadata['label']))
test_labels_dict = dict(zip(test_metadata['file'], test_metadata['label']))

train_dataset = AudioIterabaleDataset(f"{DATASET_DIR}/train", feature_extractor, train_labels_dict,
                                      split="train")
dev_dataset = AudioIterabaleDataset(f"{DATASET_DIR}/dev", feature_extractor, dev_labels_dict,
                                    split="dev")  
test_dataset = AudioIterabaleDataset(f"{DATASET_DIR}/test", feature_extractor, test_labels_dict,
                                     split="test")

In [8]:
# Verify the dataset
for data in train_dataset:
    print(data['input_values'].shape)
    
    # check mean and variance
    print(data['input_values'].mean())
    print(data['input_values'].var())

    break

torch.Size([6720000])
tensor(3.6330e-09)
tensor(0.9979)


In [9]:
LABELS = {
    "depressed": 1,
    "not_depressed": 0
}

id2label = {0: "not_depressed", 1: "depressed"}
label2id = {"depressed": 1, "not_depressed": 0}

In [None]:
model = AutoModelForAudioClassification.from_pretrained(MODEL_ID, num_labels=2, id2label=id2label,
                                                        label2id=label2id)

In [21]:
training_args = TrainingArguments(
    output_dir=f"{OUTPUT_DIR}/results",
    eval_strategy="steps",
    eval_steps=100,
    save_strategy="steps",
    save_steps=100,
    learning_rate=5e-5,
    per_device_train_batch_size=1,     # DECREASE THIS FIRST (from 4 to 2 or 1)
    gradient_accumulation_steps=8,      # INCREASE THIS to compensate for smaller batch size
    per_device_eval_batch_size=1,      # DECREASE THIS too
    num_train_epochs=10,
    warmup_ratio=0.1,
    logging_steps=50,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    fp16=True,                         # KEEP THIS TRUE for memory efficiency
    gradient_checkpointing=True,       # ADD THIS to save memory
    run_name="distilbert_1"
)

In [12]:
def compute_metrics(eval_pred):
    predictions = np.argmax(eval_pred.predictions, axis=1)
    labels = eval_pred.label_ids
    
    precision, recall, f1, _ = precision_recall_fscore_support(
        labels, predictions, average='binary'
    )
    acc = accuracy_score(labels, predictions)
    
    return {
        "accuracy": acc,
        "f1": f1,
        "precision": precision,
        "recall": recall
    }

In [22]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=dev_dataset,
    tokenizer=feature_extractor,
    compute_metrics=compute_metrics,
)

  self.scaler = torch.cuda.amp.GradScaler(**kwargs)


In [None]:
trainer.train()

In [None]:
trainer.save_model(f"{output_dir}/final_model")

# Feature Extraction

In [3]:
import os
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
import librosa
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, Input,GlobalAveragePooling2D, Conv1D, LayerNormalization, MultiHeadAttention,GlobalAveragePooling1D,MaxPooling1D
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.applications import VGG19
from tensorflow.keras.preprocessing import sequence

from sklearn.model_selection import train_test_split

# Set the directory paths
data_dir = "/kaggle/input/daic-woz-chunked/audio_chunks"
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")
dev_dir = os.path.join(data_dir, "dev")

## MFCC

In [4]:
train_metadata = pd.read_csv(os.path.join(train_dir, "metadata.csv"))
test_metadata = pd.read_csv(os.path.join(test_dir, "metadata.csv"))
dev_metadata = pd.read_csv(os.path.join(dev_dir, "metadata.csv"))

In [5]:
# Extract MFCC features
def extract_mfcc(audio_file):
    audio, sr = librosa.load(audio_file, sr=None)
    mfcc_features = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=20)
    return mfcc_features.T

train_X = [extract_mfcc(os.path.join(train_dir, fname)) for fname in train_metadata['file']]
train_y = train_metadata['label'].values

test_X = [extract_mfcc(os.path.join(test_dir, fname)) for fname in test_metadata['file']]
test_y = test_metadata['label'].values

dev_X = [extract_mfcc(os.path.join(dev_dir, fname)) for fname in dev_metadata['file']]
dev_y = dev_metadata['label'].values

In [6]:
train_X_stacked = np.vstack(train_X)
test_X_stacked = np.vstack(test_X)
dev_X_stacked = np.vstack(dev_X)
train_X_stacked.shape

(6457416, 20)

In [7]:
scaler = StandardScaler()

scaler = scaler.fit(train_X_stacked)

train_X_scaled = [scaler.transform(mfcc) for mfcc in train_X]
test_X_scaled = [scaler.transform(mfcc) for mfcc in test_X]
dev_X_scaled = [scaler.transform(mfcc) for mfcc in dev_X]

In [8]:
train_X_scaled = np.array(train_X_scaled)
test_X_scaled = np.array(test_X_scaled)
dev_X_scaled = np.array(dev_X_scaled)
train_X_scaled.shape, test_X_scaled.shape, dev_X_scaled.shape

((357, 18088, 20), (55, 18088, 20), (61, 18088, 20))

In [34]:
train_X_scaled_ = np.expand_dims(train_X_scaled, axis=3)
test_X_scaled_ = np.expand_dims(test_X_scaled, axis=3)
dev_X_scaled_ = np.expand_dims(dev_X_scaled, axis=3)

In [35]:
train_X_scaled_.shape

(357, 18088, 20, 1)

In [9]:
train_y = np.array(train_y)
test_y = np.array(test_y)
dev_y = np.array(dev_y)

### Simple CNN using MFCC

VGG or Resnet cannot be trained on MFCC because MFCC has about 20 features and VGG/Resnet requires atleast 32x32x3 input. So, we use a simple CNN model to train on MFCC features. Howvever, we can use VGG/Resnet to train on MFCC features by resizing the input to 32x32x3 or higher but that might not be a good idea.

In [66]:
model = Sequential([
    Input(shape=(train_X_scaled_.shape[0],train_X_scaled.shape[1], 1)),
    Conv2D(64, (3, 3), strides=(2, 2), activation='relu'),
    Dropout(0.3),
    
    Conv2D(128, (3, 3), strides=(2, 2), activation='relu'),
    Dropout(0.3),
    
    Conv2D(256, (3, 3), activation='relu'),
    MaxPooling2D((2,2)),
    Dropout(0.3),
    
    GlobalAveragePooling2D(),
    Dense(64, activation='relu'),
    Dropout(0.5),
    Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])


# Early stopping
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

In [67]:
history = model.fit(
    train_X_scaled_, train_y,
    validation_data=(dev_X_scaled_, dev_y),
    epochs=50,
    batch_size=16,
    callbacks=[early_stopping]
)

Epoch 1/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 346ms/step - accuracy: 0.5230 - loss: 0.7099 - val_accuracy: 0.2131 - val_loss: 0.7002
Epoch 2/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 97ms/step - accuracy: 0.5091 - loss: 0.6904 - val_accuracy: 0.2131 - val_loss: 0.7810
Epoch 3/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 97ms/step - accuracy: 0.5797 - loss: 0.6734 - val_accuracy: 0.6721 - val_loss: 0.6795
Epoch 4/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 97ms/step - accuracy: 0.6287 - loss: 0.6637 - val_accuracy: 0.5902 - val_loss: 0.6729
Epoch 5/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 96ms/step - accuracy: 0.6824 - loss: 0.6122 - val_accuracy: 0.6885 - val_loss: 0.6099
Epoch 6/50
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 96ms/step - accuracy: 0.7256 - loss: 0.5461 - val_accuracy: 0.5246 - val_loss: 0.7942
Epoch 7/50
[1m23/23[0m [32m━━

In [68]:
# Evaluate on the test set
test_loss, test_accuracy = model.evaluate(test_X_scaled_, test_y)
print("Test Accuracy:", test_accuracy)

[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 184ms/step - accuracy: 0.7140 - loss: 0.6578
Test Accuracy: 0.7272727489471436


### Transformer Model with Tensorflow

In [18]:
import tensorflow as tf
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

# Configure GPU memory growth
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("GPU memory growth enabled")
    except RuntimeError as e:
        print(e)

Num GPUs Available:  1
Physical devices cannot be modified after being initialized


In [22]:
# Parameters
# num_classes = 2            # Number of classes for classification
# num_heads = 4              # Number of attention heads
# d_model = 64               # Embedding dimension
# dff = 128                  # Feedforward dimension
# dropout_rate = 0.1         # Dropout rate
# max_len = 18088            # Maximum sequence length (or use dynamic length)
# num_layers = 3             # Number of transformer encoder layers

# Modified parameters
d_model = 128  # Make sure this is divisible by num_heads
num_heads = 8  # This divides d_model evenly (128/8 = 16)
dff = 256
num_layers = 4
dropout_rate = 0.2
num_classes = 2
max_len = 18088

# Sample input shape: (batch_size, seq_len, feature_dim)
input_shape = (max_len, 20)  # Example for MFCCs with 20 coefficients per frame

In [21]:
tf.keras.mixed_precision.set_global_policy('float32')

In [23]:
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, max_len, d_model):
        super().__init__()
        self.pos_encoding = self._get_positional_encoding(max_len, d_model)
        
    def _get_positional_encoding(self, max_len, d_model):
        pos = np.arange(max_len)[:, np.newaxis]
        i = np.arange(d_model)[np.newaxis, :]
        angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
        pos_encoding = pos * angle_rates
        pos_encoding[:, 0::2] = np.sin(pos_encoding[:, 0::2])
        pos_encoding[:, 1::2] = np.cos(pos_encoding[:, 1::2])
        return tf.cast(pos_encoding[np.newaxis, ...], dtype=tf.float32)
        
    def call(self, x):
        return x + self.pos_encoding[:, :tf.shape(x)[1], :]

class TransformerBlock(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, dropout=0.1):
        super().__init__()
        self.mha = MultiHeadAttention(num_heads=num_heads, 
                                    key_dim=d_model//num_heads,  # Important: key_dim should be d_model/num_heads
                                    value_dim=d_model//num_heads)
        self.ffn = tf.keras.Sequential([
            Dense(dff, activation='relu'),
            Dense(d_model)
        ])
        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.dropout1 = Dropout(dropout)
        self.dropout2 = Dropout(dropout)

    def call(self, x, training=None):
        # Make sure query, key, value dimensions match
        attn_output = self.mha(query=x, key=x, value=x, training=training)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)
        
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)

def build_transformer(input_shape, num_layers, d_model, num_heads, dff, num_classes, dropout_rate):
    inputs = Input(shape=input_shape)
    
    # Make sure d_model is divisible by num_heads
    assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
    
    # Initial feature processing
    x = Conv1D(filters=32, kernel_size=3, padding='same', activation='relu')(inputs)
    x = MaxPooling1D(pool_size=2)(x)
    x = Conv1D(filters=64, kernel_size=3, padding='same', activation='relu')(x)
    x = MaxPooling1D(pool_size=2)(x)
    
    # Project to d_model dimensions
    x = keras.layers.TimeDistributed(Dense(d_model))(x)
    
    # Add positional encoding
    x = PositionalEncoding(input_shape[0] // 4, d_model)(x)
    
    # Transformer blocks
    for _ in range(num_layers):
        x = TransformerBlock(d_model, num_heads, dff, dropout_rate)(x)
    
    # Global feature extraction
    x = GlobalAveragePooling1D()(x)
    
    # Final classification layers
    x = Dense(256, activation='relu')(x)
    x = Dropout(dropout_rate)(x)
    x = Dense(128, activation='relu')(x)
    x = Dropout(dropout_rate)(x)
    outputs = Dense(num_classes, activation='softmax')(x)
    
    return tf.keras.Model(inputs=inputs, outputs=outputs)

# Build and compile model
model = build_transformer(
    input_shape=(18088, 20),
    num_layers=num_layers,
    d_model=d_model,
    num_heads=num_heads,
    dff=dff,
    num_classes=num_classes,
    dropout_rate=dropout_rate
)

# Compile with mixed precision for better performance
tf.keras.mixed_precision.set_global_policy('mixed_float16')
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy', tf.keras.metrics.AUC()],
)

In [None]:
# Training setup with additional callbacks
callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=5,
        restore_best_weights=True
    ),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=2,
        min_lr=1e-6
    ),
    tf.keras.callbacks.ModelCheckpoint(
        'best_model.keras',
        monitor='val_auc',
        mode='max',
        save_best_only=True
    )
]

# Calculate class weights if dataset is imbalanced
class_weights = {
    0: 1.0,  # Adjust based on your class distribution
    1: 2.0   # Increase weight for minority class (assumed to be depressed class)
}

# Training
history = model.fit(
    train_X_scaled, 
    train_y,
    validation_data=(dev_X_scaled, dev_y),
    epochs=50,
    batch_size=16,  # Keep batch size consistent
    callbacks=callbacks,
    class_weight=class_weights
)

In [None]:
test_loss, test_accuracy = model.evaluate(test_X_scaled_, test_y)
print("Test Accuracy:", test_accuracy)

### Transformer with pytorch

In [10]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import Dataset, DataLoader
import math

# Check GPU availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


In [12]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len):
        super().__init__()
        position = torch.arange(max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        
        pe = torch.zeros(1, max_len, d_model)
        pe[0, :, 0::2] = torch.sin(position * div_term)
        pe[0, :, 1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, dff, dropout=0.1):
        super().__init__()
        self.mha = nn.MultiheadAttention(d_model, num_heads, dropout=dropout, batch_first=True)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, dff),
            nn.ReLU(),
            nn.Linear(dff, d_model)
        )
        self.layernorm1 = nn.LayerNorm(d_model)
        self.layernorm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x):
        attn_output, _ = self.mha(x, x, x)
        attn_output = self.dropout1(attn_output)
        out1 = self.layernorm1(x + attn_output)
        
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output)
        return self.layernorm2(out1 + ffn_output)

class AudioTransformer(nn.Module):
    def __init__(self, input_shape, num_layers, d_model, num_heads, dff, num_classes, dropout_rate):
        super().__init__()
        
        # CNN layers
        self.conv1 = nn.Conv1d(input_shape[1], 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(2)
        
        # Project to d_model
        self.projection = nn.Linear(64, d_model)
        
        # Positional encoding
        self.pos_encoding = PositionalEncoding(d_model, input_shape[0] // 4)
        
        # Transformer layers
        self.transformer_blocks = nn.ModuleList([
            TransformerBlock(d_model, num_heads, dff, dropout_rate)
            for _ in range(num_layers)
        ])
        
        # Classification head
        self.global_pool = nn.AdaptiveAvgPool1d(1)
        self.fc1 = nn.Linear(d_model, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, num_classes)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        # CNN layers (B, C, L)
        x = x.transpose(1, 2)  # from (B, L, C) to (B, C, L)
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        
        # Project to d_model (B, L, d_model)
        x = x.transpose(1, 2)  # from (B, C, L) to (B, L, C)
        x = self.projection(x)
        
        # Add positional encoding
        x = self.pos_encoding(x)
        
        # Transformer blocks
        for transformer in self.transformer_blocks:
            x = transformer(x)
        
        # Global pooling
        x = x.mean(dim=1)  # Global average pooling
        
        # Classification
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        
        return F.log_softmax(x, dim=1)

# Custom Dataset
class AudioDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.FloatTensor(X)
        self.y = torch.LongTensor(y)
        
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Training function
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device):
    best_val_loss = float('inf')
    patience = 5
    patience_counter = 0
    
    for epoch in range(num_epochs):
        # Training
        model.train()
        train_loss = 0
        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            optimizer.zero_grad()
            output = model(batch_X)
            loss = criterion(output, batch_y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        
        # Validation
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                output = model(batch_X)
                val_loss += criterion(output, batch_y).item()
                pred = output.argmax(dim=1)
                correct += pred.eq(batch_y).sum().item()
                total += len(batch_y)
        
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        accuracy = 100. * correct / total
        
        print(f'Epoch: {epoch+1}')
        print(f'Training Loss: {train_loss:.4f}')
        print(f'Validation Loss: {val_loss:.4f}')
        print(f'Validation Accuracy: {accuracy:.2f}%')
        
        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_model.pt')
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping triggered")
                break

# Instantiate model and move to GPU
model = AudioTransformer(
    input_shape=(18088, 20),
    num_layers=4,
    d_model=128,
    num_heads=8,
    dff=256,
    num_classes=2,
    dropout_rate=0.2
).to(device)

# Create data loaders
train_dataset = AudioDataset(train_X_scaled, train_y)
val_dataset = AudioDataset(dev_X_scaled, dev_y)
test_dataset = AudioDataset(test_X_scaled, test_y)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8)
test_loader = DataLoader(test_dataset, batch_size=8)

# Loss and optimizer
class_weights = torch.FloatTensor([1.0, 2.0]).to(device)
criterion = nn.NLLLoss(weight=class_weights)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=50, device=device)

In [None]:
# Evaluate on test set
model.eval()
test_loss = 0
correct = 0
total = 0
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        output = model(batch_X)
        test_loss += criterion(output, batch_y).item()
        pred = output.argmax(dim=1)
        correct += pred.eq(batch_y).sum().item()
        total += len(batch_y)

test_accuracy = 100. * correct / total
print(f'Test Accuracy: {test_accuracy:.2f}%')