In [16]:
from torchtext.vocab import GloVe

# Load pre-trained Word2Vec embeddings(GloVe)
word_embeddings = GloVe(name='6B', dim=100)

In [None]:
import sys
sys.path.append('../')  # Add the parent folder to the system path
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from artificial_nn import ANN
import util.preprocess as preprocess

# use below line for local use
# extracted_df = pd.read_csv("./nacc_processed.csv").astype("float")


# use below lines assuming you only have raw data (not filtered one)

data = pd.read_csv("./investigator_ftldlbd_nacc65.csv")

In [None]:
data['VISITDAY'] = data['VISITDAY'].astype(str).str.zfill(2)
data['VISITMO'] = data['VISITMO'].astype(str).str.zfill(2)
data['VISITYR'] = data['VISITYR'].astype(str)
data['VISITDT'] = data['VISITYR'] + data['VISITMO'] + data['VISITDAY']

columns_to_use = ['NACCID', 'VISITDT','NACCMOCA','CRAFTDRE','COMMUN','NACCMMSE','HOMEHOBB','JUDGMENT','LOGIMEM','CDRSUM','MEMORY', 'BOSTON', 'MINTTOTS', 'ANIMALS', 'MEMUNITS', 'TRAILB', 'NACCUDSD']
extracted_df = data[columns_to_use]
extracted_df = extracted_df.sort_values(by=["NACCID", "VISITDT"], ascending=True)
new_csv = './nacc_processed.csv'
# write the DataFrame to a CSV file
extracted_df.to_csv(new_csv, index=False)

In [None]:
features_to_impute = columns_to_use[2:-1]

def forward_and_backward_impute(group):
    for feature in features_to_impute:
        if feature == "NACCMOCA" or feature == "NACCMMSE":
            group[feature] = group[feature].replace([-4,88,99], pd.NA)
        else:
            # replace -4 and 99 with nan for processing
            group[feature] = group[feature].replace([-4,99], pd.NA)
        # forward fill nan values
        group[feature] = group[feature].ffill()
        # backward fill nan values
        group[feature] = group[feature].bfill()
    return group

# Group by patient_id and apply forward and backward impute function
extracted_df = extracted_df.groupby('NACCID').apply(forward_and_backward_impute)

for feature in features_to_impute:
    extracted_df[feature] = extracted_df[feature].fillna(-4)

new_csv = './fb_imputed.csv'
# write the DataFrame to a CSV file
extracted_df.to_csv(new_csv, index=False)

In [None]:
severity_value_pairs  = {
    "CRAFTDRE" : [95,96,97,98],
    "LOGIMEM" : [95,96,97,98],
    "NACCMMSE" : [95,96,97,98],
    "BOSTON" : [95,96,97,98],
    "MINTTOTS" : [95,96,97,98],
    "ANIMALS" : [95,96,97,98],
    "MEMUNITS" : [95,96,97,98],
    "TRAILB" : [995,996,997,998],
}

include_severity_vals = list(severity_value_pairs.keys())

# create a seperate column for severity cases - binary indicator
for col in include_severity_vals:
    for val in severity_value_pairs[col]:
        extracted_df[f"{col}_{val}"] = (extracted_df[col] == val).astype(int)

# replace all severity values with NaN - ensure gloablity, easier to compare
for col, severity_values in severity_value_pairs.items():
    extracted_df[col] = extracted_df[col].replace(severity_values, np.nan)

# imputation values is a dictionary that contains mean values of columns with each label
# keys : column names , values : list with 4 (number of labels) values
imputation_values = {}

for col in severity_value_pairs.keys():
    means = []
    for label in range(1,5):
        means.append(extracted_df[extracted_df["NACCUDSD"] == label][col].mean())
    imputation_values[col] = means

# replace each missing value with its imputation values
for index, row in extracted_df.iterrows():
    for col in severity_value_pairs.keys():
        if np.isnan(row[col]):
            label = row["NACCUDSD"]
            imputation_val = imputation_values[col][label - 1]
            extracted_df.at[index, col] = imputation_val

main_df = extracted_df

preprocess.put_the_column_at_end(main_df, "NACCUDSD")

In [None]:
# handle missing values
missing_value_pairs  = {
    "NACCMOCA" : [-4,88,99],
    "CRAFTDRE" : [-4],
    "LOGIMEM" : [-4],
    "NACCMMSE" : [-4,88],
    "CDRSUM" : [99],
    "BOSTON" : [-4],
    "MINTTOTS" : [-4],
    "ANIMALS" : [-4],
    "MEMUNITS" : [-4],
    "TRAILB" : [-4],
}

# replace all missing values with NaN - ensure gloablity, easier to compare
for col, missing_values in missing_value_pairs.items():
    main_df[col] = main_df[col].replace(missing_values, np.nan)

# imputation values is a dictionary that contains mean values of columns with each label
# keys : column names , values : list with 4 (number of labels) values
imputation_values = {}

for col in missing_value_pairs.keys():
    means = []
    for label in range(1,5):
        means.append(main_df[main_df["NACCUDSD"] == label][col].mean())
    imputation_values[col] = means

# replace each missing value with its imputation values
for index, row in main_df.iterrows():
    for col in missing_value_pairs.keys():
        if np.isnan(row[col]):
            label = row["NACCUDSD"]
            imputation_val = imputation_values[col][label - 1]
            main_df.at[index, col] = imputation_val


# due to backward and forward filling there might be few duplicate rows for the same patient
# in order to avoid bias we can remove the duplicate rows of the patient
columns_to_look_for = main_df.columns.difference(["VISITDT"])
print(main_df.shape[0])
main_df = main_df.drop_duplicates(subset=columns_to_look_for, keep='first')
print(main_df.shape[0])
# below lines can be used to visualize new matrix - ensure everything is going okay basically
new_csv = './visualize_main_df.csv'
main_df.to_csv(new_csv, index=False)

# Replace placeholder values with NaN and count missing values
#for column, missing_values in missing_value_pairs.items():
#    main_df[column] = main_df[column].replace(missing_values, pd.NA)
#    missing_count = main_df[column].isna().sum()
#    print(f"{column} has {missing_count} missing values")

In [None]:
from imblearn.under_sampling import RandomUnderSampler
from collections import Counter

X, y = preprocess.sep_column(main_df, "NACCUDSD")
print(f"Original class distribution: {Counter(y)}")
undersample = RandomUnderSampler(sampling_strategy='auto', random_state=42)
X_resampled, y_resampled = undersample.fit_resample(X, y)
print(f"Class distribution after random undersampling: {Counter(y_resampled)}")

main_df = pd.concat([X_resampled, y_resampled], axis=1)
new_csv = './undersampled_df.csv'
main_df.to_csv(new_csv, index=False)

In [None]:
import itertools
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import matplotlib.pyplot as plt

# shape the dataset before feeding to NN
main_df = pd.read_csv("./undersampled_df.csv")
main_df = main_df.drop(["NACCID","VISITDT"], axis=1)

suitable_word_embedded_columns = ["COMMUN", "HOMEHOBB", "JUDGMENT", "MEMORY"]

main_df = main_df[main_df['NACCUDSD'] != 2]
main_df['NACCUDSD'] = main_df['NACCUDSD'].replace({3: 2, 4: 3})

# delete the rows with 99 in it (missing value)
main_df = main_df[~main_df[suitable_word_embedded_columns].eq(99).any(axis=1)]

string_mapping = {
    0.0: 'none',
    0.5: 'ambiguous',
    1.0: 'mild',
    2.0: 'moderate',
    3.0: 'severe',
    # 99 : 'default'
}

dataframe = main_df.copy(deep = True)

for col in suitable_word_embedded_columns:
    dataframe[col] = dataframe[col].map(string_mapping)

# Features : X , Labels : y
X, y = preprocess.sep_column(dataframe, "NACCUDSD")
y = y.astype(int)

word_embedded_data_frame = X[suitable_word_embedded_columns]

# Create the second DataFrame with all columns except the specified ones
without_word_embedded_data_frame = X.drop(columns=suitable_word_embedded_columns)




In [None]:
X_train, X_test, y_train, y_test = train_test_split(without_word_embedded_data_frame, y, test_size=0.2, random_state=462)
print(y_train.unique())

# adjustment for pytorch nn training 
"""
The targets should be in the range [0, 3] for our use case, as they are used to index the output tensor
"""
y_train = y_train - 1
y_test = y_test - 1

    
X_train_tensor = torch.tensor(X_train.values.astype(np.float32))
X_test_tensor = torch.tensor(X_test.values.astype(np.float32))
y_train_tensor = torch.tensor(y_train.values)
y_test_tensor = torch.tensor(y_test.values)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=512, shuffle=True)

input_size = X_train_tensor.shape[1]
hidden_sizes = [512,256,128,64]               # hidden layer size is hyperparameter
output_size = len(y.unique())

model = ANN.ArtificialNeuralNetwork(input_size, hidden_sizes, output_size)

criterion = nn.CrossEntropyLoss()                       # cross entropy value is used as loss function
optimizer = optim.Adam(model.parameters(), lr=0.0001)    # learning rate is hyperparameter

num_epochs = 500
loss_list = []

for epoch in range(num_epochs):
    loss_of_epoch = 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()               # clears the gradients before new back prop (new batch)
        outputs = model.forward(inputs)     # feed model with forward prop (get predictions - outputs)
        loss = criterion(outputs, labels)   # calculate loss value of predictions
        loss.backward()                     # perform back prop to compute gradient w.r.t model params
        optimizer.step()                    # update the model params (weights) according to LR and gradient

        loss_of_epoch += loss.item()

    loss_list.append(loss_of_epoch / len(train_loader))

    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")

plt.plot(range(1, num_epochs + 1), loss_list, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title(f'Training Loss Curve for continues values')
plt.legend()
plt.show()

# evaluation of model in test set
with torch.no_grad():
    model.eval()
    outputs = model.forward(X_test_tensor)
    _, without_word_embedding_predicted = torch.max(outputs, 1)
    without_word_embedding_accuracy = (without_word_embedding_predicted == y_test_tensor).sum().item() / y_test_tensor.size(0)
print("\nClassification Report for without word embedding part:")
report1_str = classification_report(y_test_tensor.cpu(), without_word_embedding_predicted.cpu(), digits=4,target_names=['Normal cognition', 'MCI', 'Dementia'])
print(report1_str)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(word_embedded_data_frame, y, test_size=0.2, random_state=462)

# adjustment for pytorch nn training 
"""
The targets should be in the range [0, 3] for our use case, as they are used to index the output tensor
"""
y_train = y_train - 1
y_test = y_test - 1

X_train = preprocess.replace_with_word_embeddings(X_train, word_embeddings, suitable_word_embedded_columns)
X_test = preprocess.replace_with_word_embeddings(X_test, word_embeddings, suitable_word_embedded_columns)
    
X_train_tensor = torch.tensor(X_train.values.astype(np.float32))
X_test_tensor = torch.tensor(X_test.values.astype(np.float32))
y_train_tensor = torch.tensor(y_train.values)
y_test_tensor = torch.tensor(y_test.values)



train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

input_size = X_train_tensor.shape[1]
hidden_sizes = [500,400,300,200,100,50]               # hidden layer size is hyperparameter
output_size = len(y.unique())

model = ANN.ArtificialNeuralNetwork(input_size, hidden_sizes, output_size)

criterion = nn.CrossEntropyLoss()                       # cross entropy value is used as loss function
optimizer = optim.Adam(model.parameters(), lr=0.0001)    # learning rate is hyperparameter

num_epochs = 500
loss_list = []

for epoch in range(num_epochs):
    loss_of_epoch = 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()               # clears the gradients before new back prop (new batch)
        outputs = model.forward(inputs)     # feed model with forward prop (get predictions - outputs)
        loss = criterion(outputs, labels)   # calculate loss value of predictions
        loss.backward()                     # perform back prop to compute gradient w.r.t model params
        optimizer.step()                    # update the model params (weights) according to LR and gradient

        loss_of_epoch += loss.item()

    loss_list.append(loss_of_epoch / len(train_loader))

    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}")

plt.plot(range(1, num_epochs + 1), loss_list, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title(f'Training Loss Curve for replacing {" , ".join(suitable_word_embedded_columns)} attribute with word embedding vector')
plt.legend()
plt.show()

# evaluation of model in test set
with torch.no_grad():
    model.eval()
    outputs = model.forward(X_test_tensor)
    _, word_embedding_predicted = torch.max(outputs, 1)
    with_word_embedding_accuracy = (word_embedding_predicted == y_test_tensor).sum().item() / y_test_tensor.size(0)
print("\nClassification Report for word embedding part:")
report2_str = classification_report(y_test_tensor.cpu(), word_embedding_predicted.cpu(), digits=4, target_names=['Normal cognition', 'MCI', 'Dementia'])