## Required Libraries

In [None]:
!pip install torch
!pip install pandas
!pip install huggingface_hub
!pip install transformers
!pip install sentencepiece sacremoses
!pip install tensorflow
!pip install sklearn

## Importing packages

In [None]:
from sklearn.model_selection import StratifiedKFold
import torch
import torch.nn as nn
import pandas as pd
from torch.utils.data import DataLoader, TensorDataset, random_split
from torch.nn import functional as F
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
# Load pre-trained BERT model and tokenizer
from torch.hub import load

## Loadind Base Model and Data

In [None]:
bert = load('huggingface/pytorch-transformers', 'model', 'bert-base-uncased').to('cuda')
tokenizer = load('huggingface/pytorch-transformers', 'tokenizer', 'bert-base-uncased')


# Load data
data = pd.read_csv('path_to_data.csv')
texts = data['data_noPause'].values.tolist()
old_labels = data['category'].tolist()

# Tokenization
# tokens = [tokenizer.encode(text, max_length=256, pad_to_max_length=True) for text in texts]
tokens = [tokenizer.encode(" ".join(map(str, text)), max_length=256, padding="max_length", truncation=True) for text in texts]
labels = [0 if item == 'Control' else 1 if item == 'dementia' else item for item in old_labels]
num_labels = 1
reconstruct_embedding = 264 # 256+8
# Convert to tensors
input_ids = torch.tensor(tokens)
attention_mask = (input_ids != 0).float()
pause_count= torch.tensor(data[['pause1','pause2','pause3_noLoPause']].values.tolist())
labels = torch.tensor(labels)
labels = labels.to(dtype=torch.long)


# Specify the number of folds
num_folds = 20
kf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=42)

# Lists to store results for each fold
f1_scores = []
accuracies = []

Constructing The model Artchitecture

In [None]:
def reset_all_weights(model: nn.Module) -> None:
    """
    refs:
        - https://discuss.pytorch.org/t/how-to-re-set-alll-parameters-in-a-network/20819/6
        - https://stackoverflow.com/questions/63627997/reset-parameters-of-a-neural-network-in-pytorch
        - https://pytorch.org/docs/stable/generated/torch.nn.Module.html
    """
    @torch.no_grad()
    def weight_reset(m: nn.Module):
        # - check if the current module has reset_parameters & if it's callabed called it on m
        reset_parameters = getattr(m, "reset_parameters", None)
        if callable(reset_parameters):
            m.reset_parameters()
    # Applies fn recursively to every submodule see: https://pytorch.org/docs/stable/generated/torch.nn.Module.html
    model.apply(fn=weight_reset)

# Model
class BertClassifier(nn.Module):
    def __init__(self, bert_model, num_labels):
        super(BertClassifier, self).__init__()
        self.bert = bert_model
        self.dropout = nn.Dropout(0.3)
        self.linear1 = nn.Linear(768, 256) #new
        self.linear2 = nn.Linear(256, num_labels)  # 768 is BERT's output size

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs[1]
        drop_output = self.dropout(pooled_output)
        embbeding = self.linear1(drop_output)
        out = self.linear2(embbeding)
        return out, embbeding


class EncoderClassifier(nn.Module):
    def __init__(self, input_dim):
        super(EncoderClassifier, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 256),  # Adjust the number of hidden units as needed
            nn.ReLU(),
            nn.Linear(256, 8)  # Bottleneck layer with dimensionality 1 ,16,8,
        )
        self.decoder = nn.Sequential(
            nn.Linear(8, 256),  # Adjust the number of hidden units as needed
            nn.ReLU(),
            nn.Linear(256, input_dim)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded,decoded



class Classifier(nn.Module):
    def __init__(self, bert_model, autoencoder, num_labels, reconstruct_embedding):
        super(Classifier, self).__init__()
        self.bert = bert_model
        self.autoencoder = autoencoder
        self.fusion = nn.Linear(reconstruct_embedding , num_labels)

    def forward(self, input_ids, attention_mask,pause_count):
        outputs, embbeding = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        # Encode input data using the autoencoder
        encoded,encoded_pause = self.autoencoder(pause_count)
        # Concatenate the BERT outputs and encoded data
        concat = torch.cat([embbeding, encoded], dim=1)
        # print("embbeding",embbeding.shape)
        # print("encoded",encoded.shape)
        # Perform classification or regression on the concatenated data
        out = self.fusion(concat)
        return out, embbeding, encoded_pause

def test(classifier,val_dataloader):
    classifier.eval()
    all_labels1 = []
    all_predictions1 = []
    threshold = 0.5
    for input_ids_val, attention_mask_val,pause_count_val, labels_val in val_dataloader:
        input_ids_val, attention_mask_val, pause_count_val, labels_val = input_ids_val.to('cuda'), attention_mask_val.to('cuda'), pause_count_val.float().to('cuda'),labels_val.float().unsqueeze(1).to('cuda')
        with torch.no_grad():
            outputs,_,_= classifier(input_ids_val, attention_mask_val,pause_count_val)

        predictions =  (outputs.squeeze() > threshold).float().unsqueeze(1)
        # Append labels and predictions to the lists
        all_labels1.extend(labels_val.cpu().numpy())
        all_predictions1.extend(predictions.cpu().numpy())

    # Convert lists to NumPy arrays
    all_labels1 = np.array(all_labels1)
    all_predictions1 = np.array(all_predictions1)

    # Calculate accuracy
    accuracy = accuracy_score(all_labels1, all_predictions1)

    # Calculate precision
    precision = precision_score(all_labels1, all_predictions1)

    # Calculate recall
    recall = recall_score(all_labels1, all_predictions1)

    # Calculate F1 score
    f1 = f1_score(all_labels1, all_predictions1)

#     print(f'Accuracy: {accuracy:.4f}')
#     print(f'Precision: {precision:.4f}')
#     print(f'Recall: {recall:.4f}')
#     print(f'F1 Score: {f1:.4f}')

    return f1,precision,recall,accuracy

Training And Inference

In [None]:
bert_model = BertClassifier(bert, num_labels).to('cuda')
encoder_classifier = EncoderClassifier(input_dim=3).to('cuda')
classifier = Classifier(bert_model, encoder_classifier, num_labels,reconstruct_embedding).to('cuda')
#save classifier model and used it for each fold
optimizer = torch.optim.AdamW(classifier.parameters(), lr=2e-5)
mse = nn.MSELoss()
loss_fn = nn.BCEWithLogitsLoss() # contains sigmoid activation for binary classification
path_initial = '/content/drive/MyDrive/Pause_Encoding/path_initial'
torch.save(classifier.state_dict(), path_initial)
print('len(input_ids)',len(input_ids))
# Training loop
for fold, (train_idx, val_idx) in enumerate(kf.split(range(len(input_ids)), labels.cpu().numpy())):
    print(f"Fold {fold + 1}/{num_folds}")
    classifier.load_state_dict(torch.load(path_initial))
    classifier.train()

    # Initialization for best model path within the fold loop
    best_acc = 0
    lam = 0.75
    path_best_model = None  # Initialize to None
    # Initialize lists for each fold
    all_labels = []
    all_predictions = []

    #get the train set and val set
    X_train, y_train = input_ids[train_idx].cpu().numpy(), labels[train_idx].cpu().numpy()
    X_val, y_val = input_ids[val_idx].cpu().numpy(), labels[val_idx].cpu().numpy()

    pause_count_train = np.array(pause_count[train_idx])
    attention_mask_train = np.array(attention_mask[train_idx])
    pause_count_val = np.array(pause_count[val_idx])
    attention_mask_val = np.array(attention_mask[val_idx])

    # Convert to PyTorch tensors
    input_ids_train, attention_mask_train, pause_count_train, labels_train = (
        torch.tensor(X_train),
        torch.tensor(attention_mask_train),
        torch.tensor(pause_count_train),
        torch.tensor(y_train),
    )
    input_ids_val, attention_mask_val, pause_count_val, labels_val = (
        torch.tensor(X_val),
        torch.tensor(attention_mask_val),
        torch.tensor(pause_count_val),
        torch.tensor(y_val),
    )

    # Create data loaders
    train_data = TensorDataset(input_ids_train, attention_mask_train, pause_count_train, labels_train)
    val_data = TensorDataset(input_ids_val, attention_mask_val, pause_count_val, labels_val)
    train_dataloader = DataLoader(train_data, shuffle=True, batch_size=8)
    val_dataloader = DataLoader(val_data, batch_size=8)

    # Training loop for the current fold
    for epoch in range(10):
        classifier.train()
        current_path_best_model = f'/content/drive/MyDrive/Pause_Encoding/best_model_fold_{fold}.pth'
        for input_ids_train, attention_mask_train, pause_count_train, labels_train in train_dataloader:
            input_ids_train, attention_mask_train, pause_count_train, labels_train = (
                input_ids_train.to('cuda'),
                attention_mask_train.to('cuda'),
                pause_count_train.float().to('cuda'),
                labels_train.float().unsqueeze(1).to('cuda'),
            )
            outputs, emb, encoded_pause = classifier(input_ids_train, attention_mask_train, pause_count_train)
            loss_cross = loss_fn(outputs, labels_train)
            reg_loss = mse(encoded_pause, pause_count_train)
            loss = loss_cross + lam * reg_loss
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        # Evaluate on the validation set
        test_acc,_,_,_ = test(classifier, val_dataloader)
        if test_acc > best_acc:
            best_acc = test_acc
            path_best_model = current_path_best_model
            torch.save(classifier.state_dict(), path_best_model)


    # Evaluation
    if path_best_model is not None:
        classifier.load_state_dict(torch.load(path_best_model))
    else:
        print("No best model found. Training may not have improved accuracy.")
    print(f"Fold {fold + 1} Report")
    f11,precision1,recall1,accuracy1 =test(classifier, val_dataloader)
    print(f'Accuracy: {accuracy1:.4f}')
    print(f'Precision: {precision1:.4f}')
    print(f'Recall: {recall1:.4f}')
    print(f'F1 Score: {f11:.4f}')



    print(f"Fold {fold + 1} Report")


    f1_scores.append(f11)
    accuracies.append(accuracy1)
    # reset_all_weights(classifier)
# Calculate mean and standard deviation
mean_f1 = np.mean(f1_scores)
std_dev_f1 = np.std(f1_scores)

mean_acc = np.mean(accuracies)
std_dev_acc = np.std(accuracies)

print(f'Mean F1 Score: {mean_f1:.4f}, Std. Dev. F1 Score: {std_dev_f1:.4f}')
print(f'Mean Accuracy: {mean_acc:.4f}, Std. Dev. Accuracy: {std_dev_acc:.4f}')



