In [164]:
import numpy as np # linear algebra 
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from sklearn.metrics import accuracy_score, classification_report
import torch
import matplotlib.pyplot as plt
import os

In [165]:
dataset_name = "2D_Helm/"

#Configurpave to be dynamically adjusted
download_path = "../data/" #In the .gitignore list an

#n the rest of the code.
path_to_datasets = download_path + "/" + dataset_name 

In [166]:
# This cell now makes use of the downloadfolder for the datasets.
df_train= pd.read_csv(path_to_datasets + "/" + 'helm_train.csv')
df_test=pd.read_csv(path_to_datasets + "/" +  'helm_test.csv')
#print("Dataframes MITBIH correctly read into workspace")

#split target and value
train_target=df_train['p(x,y)']
test_target=df_test['p(x,y)']
train=df_train.drop('p(x,y)',axis=1)
test=df_test.drop('p(x,y)',axis=1)

In [167]:
df_train.sample(1)

Unnamed: 0,x,y,"p(x,y)"
85,0.888889,0.555556,0.939693


In [168]:
#Switches to decide the dataset sampling method and which models should be run
class Config_Sampling:
    oversample = False #equals to B_SMOTE
    undersample = False
    sample_name = "UNDEFINED_SAMPLE"
    
Train_Simple_ANN = True #Trains the simple ANN

 

## **Simple Artificial Neural Network**
ANN without convolutional layers. Only Dense layers are used. No Pooling, Flattening or Dropping out. Base model for later comparison.

In [169]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

Implement Torch Dataset object

In [170]:
class ECG_Dataset(Dataset):
    def __init__(self, csv_file, transform=None, target_transform=None):
        self.dataframe = csv_file.values
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.dataframe)
        #return self.dataframe.shape[0] # Alternative notation

    def __getitem__(self, idx):
        inputs = torch.tensor(self.dataframe[idx,:-1], requires_grad=True).to(torch.float32)
        label = torch.tensor(self.dataframe[idx,-1]).to(torch.float32)

        return inputs, label

Custom function for preprocessing (to elaborate later, currently just returns the input itself)

In [171]:
class Lambda(nn.Module):
    def __init__(self, func):
        super().__init__()
        self.func = func

    def forward(self, x):
        return self.func(x)


def preprocess(x):
    return x * torch.Tensor([1.0])

In [172]:
# Define the ANN model
class SimpleANN(nn.Module):
    def __init__(self, input_size, output_size):
        super().__init__()
        self.fc0 = nn.Sequential(Lambda(preprocess))
        self.fc1 = nn.Linear(input_size, 24)
        self.fc2 = nn.Linear(24, 48)  
        self.fc3 = nn.Linear(48, 24) 
        self.fc4 = nn.Linear(24, 12)  
        self.fc5 = nn.Linear(12, output_size)  # Hidden to output layer
        self.fc6 = nn.Linear(6, output_size)  # Hidden to output layer
        self.relu = nn.LeakyReLU(negative_slope=0.001)    # Activation function
        self.dropout = nn.Dropout(p=0.1)
        self.sigmoid = nn.Sigmoid() 

    def forward(self, x):
        #x = self.fc0(x)
        x = self.fc1(torch.cos(x))
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        x = self.relu(x)
        x = self.fc4(x)
        x = self.relu(x)
        x = self.fc5(x)
        return x

In [173]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

cpu


In [174]:
kappa = 4 * torch.pi

In [175]:
def target_func(x,y, kappa): 
    pres = torch.cos(kappa*x) + torch.cos(kappa*y)
    return pres

In [176]:
class DirichletBC(nn.Module):
    def __init__(self):
        super(DirichletBC, self).__init__()

    def forward(self, inputs, outputs, kappa):
        """
        Impose Dirichlet BC on the boundary.
        Args:
        Returns:
            torch.Tensor: Computed loss (scalar).
        """
        # Determine 

        for i, (x,y) in enumerate(inputs):
            if x==0 or x==1 or y==0 or y==1:
                outputs[i] = target_func(x, y, kappa)
        
        return outputs

In [177]:
def get_data(train_ds, valid_ds, bs, shuffle):
    return (
        DataLoader(train_ds, batch_size=bs, shuffle=shuffle),
        DataLoader(valid_ds, batch_size=bs),
    )

In [178]:
train_ds = ECG_Dataset(df_train)
test_ds = ECG_Dataset(df_test)
train_dl, test_dl = get_data(train_ds, test_ds, 64, shuffle=False)

In [179]:
def batch_loss_train(outputs, labels, inputs, kappa, loss_fn, optimizer):
    loss = loss_fn(outputs, labels, inputs, kappa)
    
    loss = loss.sum()

    with torch.no_grad():
        # We can still compute loss and gradients for model parameters
        loss.backward(retain_graph=True)  # Retain graph to keep gradients for model parameters
    
    optimizer.step()
    optimizer.zero_grad()

    return loss.item()

In [180]:
def batch_loss_test(outputs, labels, loss_fn):
    loss = loss_fn(outputs, labels)    
    return loss.item()

In [181]:
def test_loop(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, mse, mape = 0, 0, 0
    epsilon = 1e-8

    # Evaluating the model with torch.no_grad()    
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            # test_loss += loss_fn(pred, y, X,kappa) / len(X)
            mse += torch.mean((y - pred) ** 2).item() 
            y = y + epsilon
            mape += torch.mean(torch.abs((y - pred) / y)) 

    
    print(f"Test set => Accuracy: {(mape):>0.4f}, Avg loss: {test_loss:>8f} \n")

In [182]:
def train_loop(dataloader, model, loss_fn, optimizer):
    model.train()
    optimizer.zero_grad()
    train_loss = 0.0
    #all_outputs = torch.empty(0, 1, requires_grad=True)
    Dirichlet = DirichletBC()
    
    for inputs, labels in dataloader:
        # forward pass. Better without shuffle to keep the coordinates sorted
        
        outputs = model(inputs)
        outputs = Dirichlet(inputs,outputs, kappa)
        train_loss += batch_loss_train(outputs,labels,inputs,kappa,loss_fn, optimizer)
    
    
    print(f'Train loss: {train_loss}')
    

In [183]:
class HelmholtzLoss(nn.Module):
    def __init__(self):
        super(HelmholtzLoss, self).__init__()

    def forward(self, preds, targets, inputs, kappa):
        """
        Compute the Calculate the helmholtz eq. for predictions.
        labels (targets) should satisfy zero anyway
        """
        grad_xy = torch.autograd.grad(outputs=preds, inputs=inputs, grad_outputs=torch.ones_like(preds), create_graph=True, allow_unused=True)[0]
                              
        grad_x = grad_xy[:,0].unsqueeze(1)
        grad_y = grad_xy[:,1].unsqueeze(1)

        laplace_x = torch.autograd.grad(outputs=grad_x, inputs=inputs, grad_outputs=torch.ones_like(grad_x), create_graph=True, allow_unused=True)[0]
        grad_xx = laplace_x[:,0].unsqueeze(1)

        laplace_y = torch.autograd.grad(outputs=grad_y, inputs=inputs, grad_outputs=torch.ones_like(grad_y), create_graph=True, allow_unused=True)[0]
        grad_yy = laplace_y[:,1].unsqueeze(1)

        #print(preds.shape, grad_xx.shape)

        loss = grad_xx + grad_yy + kappa**2 * preds

        #loss = grad_xx + grad_yy + kappa**2 * target_func(inputs[:,0],inputs[:,1],kappa)
        
        return loss


In [184]:
def fit(epochs, model, loss_fn, opt, train_dl, valid_dl):
    for t in range(epochs):  
        print(f"Epoch {t+1}   -------------------------------")
        train_loop(train_dl, model, loss_fn, optimizer)
        test_loop(test_dl, model, loss_fn)


In [185]:
# Define the model
torch.set_printoptions(precision=6)

input_size = 2  # Number of input features
output_size = 1  # Output size (e.g., regression or binary classification)
model = SimpleANN(input_size, output_size)

# Define loss and optimizer
criterion =  HelmholtzLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [186]:
fit(30, model, criterion, optimizer, train_dl, test_dl)

Epoch 1   -------------------------------
Train loss: -991.8505249023438
Test set => Accuracy: 2.9293, Avg loss: 0.000000 

Epoch 2   -------------------------------
Train loss: -1155.8004150390625
Test set => Accuracy: 3.0963, Avg loss: 0.000000 

Epoch 3   -------------------------------
Train loss: -1310.9017028808594
Test set => Accuracy: 3.2871, Avg loss: 0.000000 

Epoch 4   -------------------------------
Train loss: -1472.034423828125
Test set => Accuracy: 3.5093, Avg loss: 0.000000 

Epoch 5   -------------------------------
Train loss: -1649.3028564453125
Test set => Accuracy: 3.7475, Avg loss: 0.000000 

Epoch 6   -------------------------------
Train loss: -1841.5528564453125
Test set => Accuracy: 4.0108, Avg loss: 0.000000 

Epoch 7   -------------------------------
Train loss: -2051.4385375976562
Test set => Accuracy: 4.2941, Avg loss: 0.000000 

Epoch 8   -------------------------------
Train loss: -2273.4134521484375
Test set => Accuracy: 4.5945, Avg loss: 0.000000 

Ep