In [1]:
import nn_data
import torch
import torch.nn as nn
import numpy as np

In [2]:
HPO_PATH = 'data/hp.obo'
LABEVENTS_HPO_PATH = 'data/OUT_LABEVENTS_HPO.csv'
DIAGNOSES_HPO_PATH = 'data/DIAGNOSE_ICD_hpo.csv'


## Dataset loading

In [3]:
data = nn_data.load_data(HPO_PATH, LABEVENTS_HPO_PATH, DIAGNOSES_HPO_PATH)


In [4]:
creator = nn_data.DatasetCreator(data, enable_parent_nodes=False, enable_input_in_output=False)


## Model Creation

### Dataset Creation

In [5]:
# Dataloader, lädt die Daten aus der Tabelle in List
input_data: list[list[int]] = creator.input_data_one_hot()
target_data: list[list[int]] = creator.target_data_one_hot()

input_tensor, target_tensor = torch.FloatTensor(input_data), torch.FloatTensor(target_data)

# Unterteilung der List in 3 sets
train_size = int(len(input_data)*0.7)
val_size = int(len(input_data)*0.2)
test_size = len(input_data)-(train_size + val_size)

dataset = torch.utils.data.TensorDataset(input_tensor, target_tensor)

train_set, val_set, test_set = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])

### Model generation

In [6]:
# Model Parameter
enlarging_factor = 1.2
input_size = len(input_data[0])
output_size = len(target_data[0])
hidden_size = int(input_size*enlarging_factor)

NameError: name 'feature_list' is not defined

In [None]:
#device selection, where NN is trained
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

l1 = nn.Linear(input_size, hidden_size)
nn.init.xavier_uniform_(l1.weight)
l2 = nn.Linear(hidden_size, hidden_size)
nn.init.xavier_uniform_(l2.weight)
l3 = nn.Linear(hidden_size, output_size)
nn.init.xavier_uniform_(l3.weight)

# define model architecture and move to cuda
model = nn.Sequential(
    l1,
    nn.Tanh(),
    l2,
    nn.Tanh(),
    l3,
    nn.Sigmoid()
)  
model.to(device)   

In [None]:
# Trainingsparameter

batch_size = 8
learning_rate = 1e-4        
num_epochs = 20
log_rhythm = 5

In [None]:
# Bestimmung des Optimizers

optimizer = torch.optim.Adam(
    model.parameters(),
    lr=learning_rate,
    betas=(0.9, 0.999),
    ) 

In [None]:
# definition of loss function
loss_func = nn.CrossEntropyLoss()

In [None]:
# definition of accuracy function
def calc_accuracy(output, target)->float:
    result = np.zeros(output.shape)
    
    number_of_features = target.sum(axis=1)
    correctly_identified = (target * np.sqrt(output)).sum(axis=1)
    return np.mean(correctly_identified / (number_of_features + .00001))

In [None]:
# definition of real effect function
def real_effect(outputs, targets):
    correct_diagnosed = 0
    false_positive = 0
    false_negative = 0
    total_to_diagnose = sum(targets[0])
    
    for i in range(len(outputs[0])):
        if(outputs[0,i]>=0.5 and targets[0,i]==1):
            correct_diagnosed += 1
        if(outputs[0,i]<0.5 and targets[0,i]==1):
            false_negative += 1
        if(outputs[0,i]>0.5 and targets[0,i]==0):
            false_positive += 1
            
    print("Correct diagnoses:" f'{correct_diagnosed}/{total_to_diagnose}')
    print("False positives:" f'{false_positive}')
    print("False negatives:" f'{false_negative}\n')
    

### Creation of datapipeline

In [None]:
# Erstellung der Dataloader

train_loader = torch.utils.data.DataLoader(train_set,batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=False)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=1, shuffle=True)

### Training

In [None]:
val_loss_history = []
val_acc_history = []
for epoch in range(num_epochs):
    print(f'[Epoch {epoch+1}/{num_epochs}]')
    
    train_loss_history = []
    train_acc_history = []
    
    for i, (inputs, targets) in enumerate(train_loader, 1):
        inputs, targets = inputs.to(device), targets.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_func(outputs, targets)
        loss.backward()
        optimizer.step()
        
        # Loss output after log_rhythm of iterations
        train_loss_history.append(loss.cpu().detach().numpy())
        train_acc_history.append(calc_accuracy(outputs.cpu().detach().numpy(), targets.cpu().detach().numpy()))
        
        
        
        
        if i % log_rhythm == 0:
            last_log_rhythm_losses = train_loss_history[-log_rhythm:]
            train_loss = np.mean(last_log_rhythm_losses)
            
            #train_acc_history.flatten()
            last_log_rhythm_acc = train_acc_history[-log_rhythm:]
            train_acc = np.mean(last_log_rhythm_acc)
            
            print(f'[Iteration {i}]\tTRAIN      loss/acc: {train_loss:.3f}\t{train_acc:.3f}')
        
        # Acc computation during after log_rhythm of iterations
        
    # Loss and acc output after an epoch        
    train_loss =  np.mean(train_loss_history)
    train_acc = np.mean(train_acc_history)
    print(f'for this epoch:\tTRAIN      loss/acc: {train_loss:.3f}\t{train_acc:.3f}')

        
    # Validation after an epoch
    val_losses = []
    val_acc = []
    model.eval()
    for inputs, targets in val_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        outputs = model(inputs)
        loss = loss_func(outputs, targets)
        val_losses.append(loss.detach().cpu().numpy())
        val_acc.append(calc_accuracy(outputs.cpu().detach().numpy(), targets.cpu().detach().numpy()))
    
    # Training step after an epoch
    model.train()
    
    val_loss =  np.mean(val_losses)
    val_acc = np.mean(val_acc)
    
    # Output of Validation loss
    val_loss_history.append(val_loss)
    print(f'\t\tVALIDATION loss/acc: {val_loss:.3f}\t{val_acc:.3f}')
    real_effect(outputs, targets)
    print("\n")
    

### Testing

In [None]:
test_acc = []
for inputs, targets in test_loader:
    
    inputs, targets = inputs.to(device), targets.to(device)

    outputs = model(inputs)
    test_acc.append(calc_accuracy(outputs.cpu().detach().numpy(), targets.cpu().detach().numpy()))
    real_effect(outputs, targets)

    
test_acc = np.mean(test_acc)

print(f'Test Accuracy: {test_acc:.3f}')

for i in range(len(outputs[0])):
    print(f'{creator.feature_list[i]}\t{outputs[0,i]:.2f}\t{"X" if targets[0,i] > 0 else " "}')