In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import sys

import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import torch.optim as optim
from torchvision import models
from torchsummary import summary
from io import StringIO


# Device configuration: check if there is a configured GPU available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')



class VGG16Regression(nn.Module):
    def __init__(self, X_scaled, y_train, parameters):
        super(VGG16Regression, self).__init__()
        self.X_train = torch.tensor(X_scaled, dtype=torch.float32).permute(0, 3, 1, 2)
        self.y_train = torch.tensor(y_train.values, dtype=torch.float32)
        self.output_size = parameters['output_size']
        self.batch_size = parameters['batch_size']
        self.learning_rate = parameters['learning_rate']
        self.num_epochs = parameters['num_epochs']
        self.construct()

    def construct(self):
        # Load the VGG16 model without the final classification layers
        self.base_model = models.vgg16(pretrained=True)
        self.base_model = nn.Sequential(*list(self.base_model.children())[:-1])

        # Flatten the output from the VGG16 layers
        self.flatten = nn.Flatten()

        # Add a new fully connected layer for regression
        self.fc1 = nn.Linear(512 * 7 * 7, self.output_size)

    def forward(self, x):
        out = self.base_model(x)
        out = self.flatten(out)
        out = self.fc1(out)
        return out
    
    # Add this function to compute RMSE
    def rmse_score(self, y_true, y_pred):
        rmse = np.sqrt(np.mean((y_true - y_pred)**2))
        return rmse

    # Add this function to compute Explained Variance Score
    def evs_score(self, y_true, y_pred):
        evs = 1 - (np.var(y_true - y_pred) / np.var(y_true))
        return evs
        
    # Add this function to compute R-squared
    def r2_score(self, y_true, y_pred):
        y_mean = y_true.mean()
        ss_total = ((y_true - y_mean)**2).sum()
        ss_res = ((y_true - y_pred)**2).sum()
        r2 = 1 - (ss_res / ss_total)
        return r2

    # Add this function to compute MAPE
    def mape_score(self, y_true, y_pred):
        mape = np.mean(np.abs((y_true - y_pred) / (y_true.astype(float) + 1e-10))) * 100
        return mape
    
    def train_model(self):
        # Define the loss function (MSE) and optimizer
        criterion = nn.MSELoss()
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate, weight_decay=0.00001)
        # Train the model
        loss_values = []  # Store the loss values at each epoch
        for epoch in range(self.num_epochs):
            # Forward pass
            outputs = self(self.X_train)
            loss = criterion(outputs.squeeze(), self.y_train)

            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            loss_values.append(loss.item())  # Append the current loss value
            if (epoch+1) % 100 == 0:
                r2 = self.r2_score(self.y_train.numpy(), outputs.detach().numpy().squeeze())
                mape = self.mape_score(self.y_train.numpy(), outputs.detach().numpy().squeeze())
                rmse = self.rmse_score(self.y_train.numpy(), outputs.detach().numpy().squeeze())
                evs = self.evs_score(self.y_train.numpy(), outputs.detach().numpy().squeeze())
                print(f'Epoch [{epoch+1}/{self.num_epochs}], Loss: {loss.item():.4f}, R-squared: {r2:.4f}, MAPE: {mape:.4f}%, RMSE: {rmse:.4f}, Explained Variance Score: {evs:.4f}')

        # Print the summary of the neural network
        summary(self, input_size=(self.input_size,))

        # Plot the loss values
        plt.plot(loss_values)
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title('Training Loss')
        plt.show()

        # Save the model checkpoint
        torch.save(self.state_dict(), 'process_data/model.ckpt')
        return self
    
    def testSample(self,scaled_X_test, X_test):
        smp_pred = torch.tensor(scaled_X_test, dtype=torch.float32)
        with torch.no_grad():  # No gradient computation is needed during inference
            y_pred = self(smp_pred)
        y_pred_numpy = y_pred.numpy().squeeze()
        X_test['Set_voltage_kde'] = y_pred_numpy  # Change this line
        return X_test



def save_summary_as_image(model, file_name, file_format='pdf'):
    
    buffer = StringIO()
    original_stdout = sys.stdout  # Add this line
    sys.stdout = buffer
    summary(model, input_size=(6,))
    sys.stdout = original_stdout
    
    # Save the summary text as a PDF or PNG
    summary_text = buffer.getvalue()
    
    fig = plt.figure(figsize=(10, len(summary_text.split('\n')) * 0.3))
    plt.gca().axis('off')
    plt.text(0, 1, summary_text, fontsize=12, va='top')
    
    plt.savefig(file_name, format=file_format, bbox_inches='tight')
    plt.close(fig)

    

