In [13]:
import numpy as np
import pandas as pd
from sklearn.metrics import r2_score
import torch
import torch.nn as nn

class Wrapper:
    def __init__(self, config):
        self.config = config

    def model_train(self, model, train_data, train_ground_truth, optimizer, criterion):
        epochs = self.config.epochs
        steps_per_epoch = train_data.shape[0] // self.config.batch_size

        ground_truth = torch.empty((0, self.config.output_size), dtype=torch.float32).to(self.config.device)
        predicted = torch.empty((0, self.config.output_size), dtype=torch.float32).to(self.config.device)
        epochs = 100
        
        for epoch in range(epochs):
            running_loss = 0.0
            count = 0

            for step in range(steps_per_epoch):
                inputs = train_data[step * self.config.batch_size : (step + 1) * self.config.batch_size]
                targets = train_ground_truth[step * self.config.batch_size : (step + 1) * self.config.batch_size]
                # inputs, targets = inputs.to(self.config.device), targets.to(self.config.device)  # Move data to GPU

                optimizer.zero_grad()  # Clear gradients

                # Forward pass
                outputs = model(inputs)

                loss = criterion(outputs, targets)

                # Backward pass and optimization
                loss.backward()
                optimizer.step()
                running_loss += loss.item()

                if epoch == epochs - 1:
                    ground_truth = torch.cat((ground_truth, targets[:, 0, :]), 0)
                    predicted = torch.cat((predicted, outputs[:, 0, :]), 0)

            print(f'Epoch [{epoch+1}/{self.config.epochs}], Loss: {running_loss / steps_per_epoch}')

        return ground_truth, predicted
    
    def model_test(self, model, test_data, test_ground_truth):
        model.eval()  # Set model to evaluation mode
        ground_truth = torch.empty((0, self.config.output_size), dtype=torch.float32).to(self.config.device)
        predicted = torch.empty((0, self.config.output_size), dtype=torch.float32).to(self.config.device)

        #Batchwise testing
        step_per_epoch = test_data.shape[0] // self.config.batch_size
        for step in range(step_per_epoch):
            inputs = test_data[step * self.config.batch_size : (step + 1) * self.config.batch_size]
            targets = test_ground_truth[step * self.config.batch_size : (step + 1) * self.config.batch_size]
            # inputs, targets = inputs.to(self.config.device), targets.to(self.config.device)  # Move data to GPU

            outputs = model(inputs)
            ground_truth = torch.cat((ground_truth, targets[:, 0, :]), 0)
            predicted = torch.cat((predicted, outputs[:, 0, :]), 0)

        return ground_truth, predicted
    
    def evaluation_metrics(self, ground_truth, predicted):
        r2 = r2_score(ground_truth.cpu().detach().numpy(), predicted.cpu().detach().numpy())
        mse = nn.MSELoss()
        rmse = torch.sqrt(mse(ground_truth, predicted))
        mae = nn.L1Loss()
        mae_loss = mae(ground_truth, predicted)
        return r2, rmse, mae_loss