In [None]:
%cd masters-thesis

In [None]:
import torch
import numpy as np
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
from torch import nn, optim
from torch.utils.data import TensorDataset, DataLoader, random_split
from sklearn.preprocessing import Normalizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

from thesisproject.models import FixedFeatureLSTM

In [None]:
# csv containing filename, subject_id_and_knee, is_right, TKR, visit, and all extracted features. Generated by create_feature_extractions script.

path = "feature_extract.csv"
df = pd.read_csv(path).fillna(0.)

subject_id_and_knee = lambda row: str(row["ID"]) + ("-R" if row["is_right"] else "-L")
df["subject_id_and_knee"] = df.apply(subject_id_and_knee, axis=1)

n_samples = df["subject_id_and_knee"].unique().shape[0]
print(f"{n_samples} total samples.")

In [None]:
exclude_columns = {"ID", "is_right", "visit", "filename", "subject_id_and_knee", "TKR"}
include_columns = list(set(df.columns.values) - exclude_columns)
n_features = df[include_columns].shape[1]
max_visits = max(df["subject_id_and_knee"].value_counts())

print(f"{n_features} features. {max_visits} maximum visits.")

In [None]:
# Creating dataset
exclude_columns = {"ID", "is_right", "visit", "filename", "subject_id_and_knee", "TKR"}
include_columns = list(set(df.columns.values) - exclude_columns)

X = []
y = []
for subject_id in df["subject_id_and_knee"].unique():
    subject_rows = df[df["subject_id_and_knee"] == subject_id][include_columns]
    features = np.array([row.values for _, row in subject_rows.iterrows()])

    n_visits = features.shape[0]
    if n_visits < max_visits:
        pad = np.zeros((max_visits - n_visits, n_features), dtype=features.dtype)
        features = np.concatenate([features, pad], axis=0)
        
    X.append(features)
    
    tkr = df[df["subject_id_and_knee"] == subject_id]["TKR"].values[0]
    y.append(int(tkr))
    
X = torch.from_numpy(np.array(X).astype(np.float32))
y = torch.from_numpy(np.array(y).astype(np.uint8))

In [None]:
dataset = TensorDataset(X, y)

train_n = int(n_samples * 0.8)
test_n = n_samples - (train_n)

print(f"{train_n} training samples\n{test_n} testing samples")

train, test = random_split(dataset, lengths=[train_n, test_n])

train_loader = DataLoader(train, batch_size=8, shuffle=True, num_workers=4)
#val_loader = DataLoader(val, batch_size=8, shuffle=True, num_workers=4)
test_loader = DataLoader(test, batch_size=8, shuffle=True, num_workers=4)

In [None]:
device = device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

hidden_size = 100
net = FixedFeatureLSTM(n_features, hidden_size, 2)
net.to(device)

# Training

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=5e-5)

for epoch in range(100):  # loop over the dataset multiple times
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data[0].to(device), data[1].to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 10 == 9:    # print every 2000 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 10:.3f}')
            running_loss = 0.0

print('Finished Training')

In [None]:
def calculate_metrics(y_target, y_pred):
    cm = confusion_matrix(y_target, y_pred)
    tn, fp, fn, tp = cm.ravel()

    eps = 1e-6
    accuracy = (tp + tn) / (tn + fp + fn + tp + eps)
    precision = tp / (tp + fp + eps)
    recall = tp / (tp + fn + eps)
    specificity = tn / (tn + fp + eps)

    return [accuracy, precision, recall, specificity]

In [None]:
val_loss = 0
n_batches = 0
metrics = np.array([0., 0., 0., 0.])

net.eval()
with torch.no_grad():
    for i, data in enumerate(test_loader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data[0].to(device), data[1].to(device)

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)

        # print statistics
        val_loss += loss.item()
        y_pred = torch.argmax(outputs, dim=1)
        batch_metrics = calculate_metrics(
            labels.detach().cpu().numpy(), 
            y_pred.detach().cpu().numpy()
        )

        metrics += batch_metrics
        n_batches += 1

val_loss /= n_batches
metrics /= n_batches

print(f"""
Validation test metrics:
loss:        {val_loss}
accuracy:    {metrics[0]}
precision:   {metrics[1]}
recall:      {metrics[2]}
specificity: {metrics[3]}
""")