In [None]:
import pandas as pd
import os
import numpy as np
import math 
import matplotlib.pyplot as plt
import time 
from sklearn.model_selection import train_test_split # for splitting the data
from sklearn.metrics import mean_squared_error # for calculating the cost function
from sklearn.ensemble import RandomForestRegressor # for building the model

In [None]:
#adjust display of dataframe within jupyter notebook
pd.set_option('display.max_rows', 32)
pd.set_option('display.max_columns', 150)

In [None]:
def smape(a, f):
    return 1/len(a) * np.sum(2 * np.abs(f-a) / (np.abs(a) + np.abs(f))*100)

# Data preparation 

In [None]:
data_raw = pd.read_csv('raw_mwd_explosives_Franziska.csv')
data_raw

In [None]:
#define which hole types to use
count = 0
for group_name, group_df in data_raw.groupby('holeID'):
    if 0.0 not in group_df['Hole type'].values:
        count +=1
print(f"In {count} groups there is no type 0.")

count = 0
for group_name, group_df in data_raw.groupby('holeID'):
    if 2.0 not in group_df['Hole type'].values:
        count +=1
print(f"In {count} groups there is no type 2.")

count = 0
for group_name, group_df in data_raw.groupby('holeID'):
    if 3.0 not in group_df['Hole type'].values:
        count +=1
print(f"In {count} groups there is no type 3.")

count = 0
for group_name, group_df in data_raw.groupby('holeID'):
    if 4.0 not in group_df['Hole type'].values:
        count +=1
print(f"In {count} groups there is no type 4.")

count = 0
for group_name, group_df in data_raw.groupby('holeID'):
    if 5.0 not in group_df['Hole type'].values:
        count +=1
print(f"In {count} groups there is no type 5.")

count = 0
for group_name, group_df in data_raw.groupby('holeID'):
    if 6.0 not in group_df['Hole type'].values:
        count +=1
print(f"In {count} groups there is no type 6.")

count = 0
for group_name, group_df in data_raw.groupby('holeID'):
    if 7.0 not in group_df['Hole type'].values:
        count +=1
print(f"In {count} groups there is no type 7.")

count = 0
for group_name, group_df in data_raw.groupby('holeID'):
    if 8.0 not in group_df['Hole type'].values:
        count +=1
print(f"In {count} groups there is no type 8.")

In [None]:
#use just hole type 0
data_sorted = data_raw.loc[data_raw['Hole type'] == 0.0].copy()

#sort according to location of the hole

count = 0
for group_name, group_df in data_sorted.groupby('holeID'):
    if "upper" not in group_df['location'].values:
        count +=1
print(f"In {count} groups there is no borehole located in upper.")

count = 0
for group_name, group_df in data_sorted.groupby('holeID'):
    if "buttom" not in group_df['location'].values:
        count +=1
print(f"In {count} groups there is no borehole located in buttom.")

count = 0
for group_name, group_df in data_sorted.groupby('holeID'):
    if "upper & bottom" not in group_df['location'].values:
        count +=1
print(f"In {count} groups there is no borehole located in upper & bottom.")

#as in only 1700 groups there is no borehole located in upper & bottom, this is chosen
data_sorted = data_sorted.loc[data_raw['location'] == "upper & bottom"].copy()
data_sorted

In [None]:
#train test split: five for test, ten for train (<= nuique_holeIDs_per_faceN)

def faces_holes_df(dataframe):
    #get all possible combinations of face and hole
    unique_combinations = dataframe[['faceN', 'holeID']].drop_duplicates()
    
    #create df with just faces and holes
    faces_to_holes = pd.DataFrame(columns=['faceN', 'holeID'])
    faces_to_holes['faceN'] = unique_combinations['faceN']
    faces_to_holes['holeID'] = unique_combinations['holeID']
    
    return faces_to_holes

#get all faces with their holes
faces_holes = faces_holes_df(data_sorted)

#check minimum number of chosen holes per face
unique_holeIDs_per_faceN = faces_holes.groupby('faceN')['holeID'].nunique()

print(f'minimal number of holes per face {unique_holeIDs_per_faceN.min()}')



In [None]:
#split boreholes in train and test

def train_test_split(dataframe):
    train_holes = pd.DataFrame(columns=['faceN', 'holeID'])
    test_holes = pd.DataFrame(columns=['faceN', 'holeID'])

    # go through different faceN and use first and second value to select holeID for train test split
    for faceN, group in dataframe.groupby('faceN'):
        train_holes_batch = group['holeID'].iloc[:10]  #first ten holes per face for training
        test_holes_batch = group['holeID'].iloc[10:15] #next five holes for test

        train_holes_batch_df = pd.DataFrame({'faceN': faceN, 'holeID': train_holes_batch})
        test_holes_batch_df = pd.DataFrame({'faceN': faceN, 'holeID': test_holes_batch})
        
        train_holes = pd.concat([train_holes, train_holes_batch_df], ignore_index=True)
        test_holes = pd.concat([test_holes, test_holes_batch_df], ignore_index=True)
    
    return train_holes, test_holes

train_holes = pd.DataFrame(columns=['faceN', 'holeID'])
test_holes = pd.DataFrame(columns=['faceN', 'holeID'])

train_holes, test_holes = train_test_split(faces_holes)
print("Trainholes")
print(train_holes)
print("Testholes")
print(test_holes)

# data for LSTM

In [None]:
#prepare data in groups and assign timestamp 0 to first timestep in each group
from datetime import datetime
data = data_sorted.copy()

data['timestamp'] = None #new timestamp for timeseries
data['Time'] = pd.to_datetime(data['Time'], format='%H:%M:%S') #to calculate with time

def assign_timestamp(group):
    first_time = group['Time'].iloc[0]
    group['timestamp'] = (group['Time'] - first_time).dt.total_seconds()
    return group

#add timestamps for all groups
data_lstm = data.groupby("holeID", group_keys=False).apply(assign_timestamp).reset_index(drop=True)
data_lstm['timestamp'] = data_lstm['timestamp'].astype(int)
    

    

In [None]:
#interpolate values such that series is equally spaces --> needed for LSTM

def interpolate_rows(group):
    
    # number of timestamps per group
    timestamps =  group['timestamp'].iloc[-1] + 1 

    missing_rows = []
    existing_timestamps = group['timestamp']
    existing_timestamps_list = existing_timestamps.tolist()
    
    for i in range(timestamps):
        if i != 0 and i not in existing_timestamps_list:
            new_row = group.iloc[0].copy()
            new_row['timestamp'] = i
            new_row['Time'] = None
            new_row['HD mm'] =  np.nan
            new_row['PR dm/min'] =  np.nan
            new_row['HP bar'] =  np.nan
            new_row['FP bar'] =  np.nan
            new_row['DP bar'] =  np.nan
            new_row['RS r/min'] =  np.nan
            new_row['RP bar'] =  np.nan
            new_row['WF l/min'] =  np.nan
            new_row['WP bar'] =  np.nan
            missing_rows.append(new_row)
    
    #information about borehole as well as all min/mean/max values that will be exactly the same in new rows
    #time is not the same and will stay empty to later on see which data is interpolated an which is actual data
    
    interpolated_group = pd.concat([group, pd.DataFrame(missing_rows)]).sort_values('timestamp')
    interpolated_group[['HD mm', 'PR dm/min', 'DP bar', 'RS r/min', 'RP bar', 
                       'HP bar', 'FP bar', 'WF l/min', 'WP bar']] = interpolated_group[['HD mm', 'PR dm/min', 
                                                                                        'DP bar', 'RS r/min', 
                                                                                        'RP bar', 'HP bar', 
                                                                                        'FP bar', 'WF l/min', 
                                                                                        'WP bar']].interpolate(method='linear')
    
    
    return interpolated_group


In [None]:
#data_lstm = data_lstm.reset_index(drop=True)
data_lstm = data_lstm.groupby("holeID").apply(interpolate_rows)


In [None]:
data_lstm = data_lstm.reset_index(drop=True)

#drop if there are by any chance duplicates in timestamp (due to: date and time in multiple rows the same)
data_lstm = data_lstm.drop_duplicates(subset=['timestamp', 'holeID'], keep='first')



In [None]:
#use just test and train holes for further preparations as holeID 212359400 and 212359390 have an error when calc. timestamp

train_lstm = pd.DataFrame(columns=data_lstm.columns)
test_lstm = pd.DataFrame(columns=data_lstm.columns)

def get_lstm_data(df, combinations_df):
    filtered_df = pd.merge(df, combinations_df, on=['faceN', 'holeID'], how='inner')
    return filtered_df

train_lstm = get_lstm_data(data_lstm, train_holes)


test_lstm = get_lstm_data(data_lstm, test_holes)


# data for VAE

In [None]:
#data for VAE:
data_vae = data_sorted.copy()

#train test split for vae
train_vae = pd.DataFrame(columns=data_vae.columns)
test_vae = pd.DataFrame(columns=data_vae.columns)

def get_vae_data(df, combinations_df):
    #add just lines that have same faceN and holeID as in combinations df
    filtered_df = pd.merge(df, combinations_df, on=['faceN', 'holeID'], how='inner')
    #use just first row for each combination
    filtered_df = filtered_df.groupby(['faceN', 'holeID']).first().reset_index()
    return filtered_df

#train_holes, test_holes were defined in earlier steps
train_vae = get_vae_data(data_vae, train_holes)
train_vae







In [None]:
test_vae = get_vae_data(data_vae, test_holes)
test_vae

# Conditional Variational Autoencoder

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler


# X.. dataset without target value dr_orig, Y.. just target value
vaex_test = test_vae.drop(columns=['Unnamed: 0','explosives,kg/m3','faceN', 'Time', 'Section number * 1000',
                                    'Hole number', 'Hole type', 'Boom', 'Date and time at rockcontact',
                                    'location', 'holeID', 'Time_sec_mean', 'Time_sec_min',
                                    'Time_sec_max', 'tunnel support']).reset_index(drop=True)
vaey_test = test_vae['explosives,kg/m3']

vaex_train = train_vae.drop(columns=['Unnamed: 0','explosives,kg/m3','faceN', 'Time', 'Section number * 1000',
                                    'Hole number', 'Hole type', 'Boom', 'Date and time at rockcontact',
                                    'location', 'holeID', 'Time_sec_mean', 'Time_sec_min',
                                    'Time_sec_max', 'tunnel support']).reset_index(drop=True)
vaey_train = train_vae['explosives,kg/m3']

#keep names for columns
cols = vaex_test.columns




In [None]:
# Initialize a single scaler as y and x will be combined for scaling 
scaler = StandardScaler()

# Concatenate training data, scale, and split
combined_train = pd.concat([vaex_train, vaey_train], axis=1)
combined_train_scaled = scaler.fit_transform(combined_train)
print(combined_train_scaled.shape)

# Split back into aex_train and aey_train
vaex_train_scaled = combined_train_scaled[:, :vaex_train.shape[1]]
vaey_train_scaled = combined_train_scaled[:, vaex_train.shape[1]:]

# Do the same for test data
combined_test = pd.concat([vaex_test, vaey_test], axis=1)
combined_test_scaled = scaler.transform(combined_test) #not fit_transform!

vaex_test_scaled = combined_test_scaled[:, :vaex_test.shape[1]]
vaey_test_scaled = combined_test_scaled[:, vaex_test.shape[1]:]



# Convert to PyTorch tensors
X_train_tensor = torch.tensor(vaex_train_scaled).float()
y_train_tensor = torch.tensor(vaey_train_scaled).float()
X_test_tensor = torch.tensor(vaex_test_scaled).float()

In [None]:
# definition of VAE
class VAE(nn.Module):
    def __init__(self, input_size, latent_size): #no additional conditions implemented > more of a VAE
        super(VAE, self).__init__()
        
        # encoder
        self.encoder = nn.Sequential(
            nn.Linear(input_size, 128), #fully connected layer
            nn.ReLU(), # activation function, 
            nn.Linear(128, latent_size * 2)  # 2 * latent_size for mean and log-variance, to parameterize the latent space
        )
        
        # decoder
        self.decoder = nn.Sequential(
            nn.Linear(latent_size, 128),
            nn.ReLU(),
            nn.Linear(128, input_size),
        )
        self.latent_size = latent_size

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        # Encode
        enc_output = self.encoder(x)
        mu, logvar = torch.chunk(enc_output, 2, dim=1)
        z = self.reparameterize(mu, logvar)

        # Decode
        dec_output = self.decoder(z)

        return dec_output, mu, logvar  #mean of latent space, log of variance of latent space

# Dataset preparation
class CustomDataset(Dataset):
    def __init__(self, data, target):
        self.data = data
        self.target = target

    # defines number of datarows
    def __len__(self):
        return len(self.data)
    
    # define how to call a specific datarow/datapoint
    def __getitem__(self, idx):
        x = self.data[idx]
        y = self.target[idx]
        return x, y

# Hyperparameter
input_size = vaex_train_scaled.shape[1]
latent_size = 10  #if too high --> overfitting
batch_size = 64
num_epochs = 200
learning_rate = 1e-3

# DataLoader
train_dataset = CustomDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)

# Modell initialisieren
model = VAE(input_size, latent_size)

# Optimizer and Loss-Funktion
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.MSELoss()

#train vae
for epoch in range(num_epochs):
    for data, target in train_loader:
        # Reset Gradienten
        optimizer.zero_grad()
        
        # Forward Pass
        recon_batch, mu, logvar = model(data)

        # calculate losses
        loss = criterion(recon_batch, data) + 0.5 * torch.sum(logvar.exp() - logvar - 1 + mu.pow(2))
        
        # Backward Pass and optimization
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

 
    
# Test VAE
with torch.no_grad():
    test_output, _, _ = model(X_test_tensor)
    test_loss = criterion(test_output, X_test_tensor)
    print(f"Test Loss: {test_loss.item():.4f}")

combined = pd.concat([pd.DataFrame(test_output.detach().numpy()), vaey_test], axis=1)
predicted_list = scaler.inverse_transform(combined)

#create dataframe to compare results
result_df = pd.DataFrame()
result_df['originalExplosives'] = vaey_test
predicted_list2 = predicted_list[:, pd.DataFrame(predicted_output.detach().numpy()).shape[1]]
result_df['estimatedExplosives'] = predicted_list2

In [None]:
print(result_df)

In [None]:
#calculate MSE and RMSE

mse_vae = mean_squared_error(result_df['originalExplosives'], result_df['estimatedExplosives'])
rmse_vae = mse_vae**.5
sym_mape_vae = smape(result_df['originalExplosives'], result_df['estimatedExplosives'])
accuracy_vae = 100 - sym_mape_vae

# result list
result_data_list_vae = [
        "MSE {:.2f}".format(mse_vae),
        "RMSE {:.2f}".format(rmse_vae),
        "Acc {:.2f}".format(accuracy_vae)
        ]

print("\t".join(result_data_list_vae))


In [None]:
#create a scatter plot
plt.scatter(result_df['originalExplosives'], result_df['estimatedExplosives'])
plt.title('Scatter Plot')
plt.xlabel('original')
plt.ylabel('predicted')
plt.axis('equal')

In [None]:
#create classes and compare results

#assign classes to original as well as predicted values, classes are distributed linearly

num_classes = 9  # (0.4 to 1.3 in 0.1-steps)

# generate a threshold
thresholds = np.linspace(0.4, 1.3, num_classes + 1)

# calculate original classes
result_df['orig_class'] = np.digitize(result_df['originalExplosives'], thresholds) - 1

# calculate classes for prediction
result_df['estim_class'] = np.digitize(result_df['estimatedExplosives'], thresholds) - 1

print(result_df)

#create confusion matrix

from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, accuracy_score
import seaborn as sns
import matplotlib.pyplot as plt

conf_matrix = confusion_matrix(result_df['orig_class'], result_df['estim_class'])
#row 1 represents class 0, 0 times it was predicted to be class 0 and 1, once that it would be class 3, 
#so all in class 0 and 1 were predicted wrong

plt.figure(figsize=(6,6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',cbar=False)
plt.title('Confusion Matrix:')
plt.xlabel('prediction')
plt.ylabel('actual class')
plt.show()


In [None]:
#compare individual results (precisioin, recall, f1, accuracy) for classes
from sklearn.metrics import classification_report

classification_report_output = classification_report(result_df['orig_class'], result_df['estim_class']) 

print(classification_report_output)


# Include LSTM in VAE 

In [None]:
#final data preparatioin
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
import numpy as np

#prepare data for scaling

# drop unwanted columns, use for y just explosives,kg/m3 once for each sequence
#lstmx_test_df is a dataframe grouped by holeID
lstmx_test_df =(test_lstm.drop(columns=['Unnamed: 0','explosives,kg/m3','faceN', 'Time', 'Section number * 1000',
                                      'Hole number', 'Hole type', 'Boom', 'Date and time at rockcontact',
                                      'location', 'Time_sec_mean', 'Time_sec_min',
                                      'Time_sec_max', 'tunnel support', 'timestamp'])).reset_index(drop=True).groupby('holeID')

#change df into an array
lstmx_test= [group.drop(columns=['holeID']).values for _, group in lstmx_test_df]

#for y use just target value
lstmy_test = test_lstm.groupby('holeID')['explosives,kg/m3'].first().reset_index()
lstmy_test = lstmy_test['explosives,kg/m3'].to_numpy() #create array for scaling


#same for train data

lstmx_train_df = (train_lstm.drop(columns=['Unnamed: 0','explosives,kg/m3','faceN', 'Time', 'Section number * 1000',
                                      'Hole number', 'Hole type', 'Boom', 'Date and time at rockcontact',
                                      'location', 'Time_sec_mean', 'Time_sec_min',
                                      'Time_sec_max', 'tunnel support', 'timestamp'])).reset_index(drop=True).groupby('holeID')

lstmx_train = [group.drop(columns=['holeID']).values for _, group in lstmx_train_df]

lstmy_train = train_lstm.groupby('holeID')['explosives,kg/m3'].first().reset_index()
lstmy_train = lstmy_train['explosives,kg/m3'].to_numpy()




#scale data (x and y indivicually)

scaler_x = StandardScaler()
scaler_y = StandardScaler()

#fit x scaler to train data
#genereate a dataframe that is not grouped yet and also has no holeID 
lstmx_train_fit_df = (train_lstm.drop(columns=['Unnamed: 0','explosives,kg/m3','faceN', 'Time', 'Section number * 1000',
                                      'Hole number', 'Hole type', 'Boom', 'Date and time at rockcontact',
                                      'location', 'Time_sec_mean', 'Time_sec_min',
                                      'Time_sec_max', 'tunnel support', 'timestamp', 'holeID']))
lstmx_train_fit = lstmx_train_fit_df.to_numpy() #data that will be transformed will also be array --> array without the groups

#fit x scaler on this data
lstmx_train_fit_scaled = scaler_x.fit_transform(lstmx_train_fit)


def scale_timeseries(sequences, scaler):
    return [scaler.transform(seq) for seq in sequences]


#scale lstmx
lstmx_train_scaled = scale_timeseries(lstmx_train, scaler_x)
lstmx_test_scaled = scale_timeseries(lstmx_test, scaler_x)


#scale lstmy (fit on train data)
lstmy_train_scaled = scaler_y.fit_transform(lstmy_train.reshape(-1, 1))
lstmy_test_scaled = scaler_y.transform(lstmy_test.reshape(-1, 1))

In [None]:
#convert scaled data in pytorch tensors
#for y tensors
y_train_tensor = torch.tensor(lstmy_train_scaled).float()
y_test_tensor = torch.tensor(lstmy_test_scaled).float()


#for x tensor
#define max length of each timeseries
max_train = max([len(group) for group in lstmx_train_scaled])
max_test = max([len(group) for group in lstmx_test_scaled])
max_seq_length = max(max_train, max_test)

print(max_seq_length )

#get train and test groups to same length (length of sequence depends on timesteps in original dataset)
def adapt_sequence_length(group, max_length):
    missing_steps = max_length - len(group)
    if missing_steps > 0:
        padded_group = np.pad(group, ((0, missing_steps), (0, 0)), mode='constant', constant_values=0)
    else:
        padded_group = group
    return padded_group

lstmx_train_padded = [adapt_sequence_length(group, max_seq_length) for group in lstmx_train_scaled]
lstmx_test_padded = [adapt_sequence_length(group, max_seq_length) for group in lstmx_test_scaled]



#expected form from model for input = [batch_size, seq_length, features]
X_train_tensor = torch.tensor(lstmx_train_padded).float()
X_test_tensor = torch.tensor(lstmx_test_padded).float()

In [None]:
#set up the model

class VAELSTM(nn.Module):
    def __init__(self, input_size, seq_length, latent_size, hidden_size=128, output_size=1):
        super(VAELSTM, self).__init__()
        
        self.seq_length = seq_length
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.latent_size = latent_size
        self.output_size = output_size

        # LSTM for encoding
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=2, batch_first=True) #True as data is [batch_size, seq_length, features]
        
        self.encoder = nn.Sequential(
            nn.Linear(hidden_size, 128), 
            nn.ReLU(),
            nn.Linear(128, latent_size * 2)  # Outputs mu and logvar
        )

        self.decoder = nn.Sequential(
            nn.Linear(latent_size, 128),
            nn.ReLU(),
            nn.Linear(128, seq_length * input_size)  # Maps back to time series form
        )
        
        # additional layer to transform output in desired output size (1 output per time series)
        self.to_final_output = nn.Linear(seq_length * input_size, output_size)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        #encoder
        _, (hidden, _) = self.lstm(x)  # Use only hidden state of the output (output, (hidden state, cell state))
        hidden = hidden[-1, :, :]  # get last layer's hidden state
        enc_out = self.encoder(hidden)
        mu, logvar = torch.chunk(enc_out, 2, dim=1)  # Split into mu and logvar
        z = self.reparameterize(mu, logvar)
        
        #decoder
        dec_out = self.decoder(z)
        dec_out = dec_out.view(-1, self.seq_length, self.input_size)  # Reshape to original data shape to [batch_size, seq_length, features]
        final_out = self.to_final_output(dec_out.view(-1, self.seq_length * self.input_size))  # Reduce to final output size
        
        return final_out, mu, logvar


In [None]:

# Hyperparameter
input_size = X_train_tensor.size(2) #number of features in each step 
latent_size = 10 
seq_length = X_train_tensor.size(1) 
hidden_size = 128
output_size = 1
batch_size = 64
num_epochs = 200
learning_rate = 1e-3


# DataLoader 

class CustomDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        return self.data[index], self.labels[index]

train_dataset = CustomDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)


model = VAELSTM(input_size, seq_length, latent_size, hidden_size, output_size)

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.MSELoss()

# Training 
for epoch in range(num_epochs):
    for data, target in train_loader:
        # Reset Gradienten
        optimizer.zero_grad()
        
        # Forward Pass
        recon_batch, mu, logvar = model(data)

        # calculate losses
        loss = criterion(recon_batch, target) + 0.5 * torch.sum(logvar.exp() - logvar - 1 + mu.pow(2))
        
        # Backward Pass and optimization
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

 # Test LSTM
with torch.no_grad():
    test_output, _, _ = model(X_test_tensor)

    test_loss = criterion(test_output, y_test_tensor)
    print(f"Test Loss: {test_loss.item():.4f}")



In [None]:
#inverse scaling
output_unscaled = scaler_y.inverse_transform(test_output)
output_unscaled_df = pd.DataFrame(output_unscaled, columns=[f'predicted' for i in range(output_unscaled.shape[1])])

original_df = pd.DataFrame(lstmy_test, columns=['original'])


result_df = pd.concat([original_df, output_unscaled_df], axis=1)

result_df




In [None]:
#calculate MSE and RMSE

mse_lstm = mean_squared_error(result_df['original'], result_df['predicted'])
rmse_lstm = mse_lstm**.5
sym_mape_lstm = smape(result_df['original'], result_df['predicted'])
accuracy_lstm = 100 - sym_mape_lstm

# results
result_data_list_lstm = [
        "MSE {:.2f}".format(mse_lstm),
        "RMSE {:.2f}".format(rmse_lstm),
        "Acc {:.2f}".format(accuracy_lstm)
        ]

print("\t".join(result_data_list_lstm))


In [None]:
#check with classification again

num_classes = 9  # (0.4 bis 1.3 in 0.1-steps)

# generale threshhold
thresholds = np.linspace(0.4, 1.3, num_classes + 1)

#calculate classes 
result_df['orig_class'] = np.digitize(result_df['original'], thresholds) - 1
result_df['estim_class'] = np.digitize(result_df['predicted'], thresholds) - 1

print(result_df)

#create confusion matrix

from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, accuracy_score
import seaborn as sns
import matplotlib.pyplot as plt

conf_matrix = confusion_matrix(result_df['orig_class'], result_df['estim_class'])

plt.figure(figsize=(6,6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues',cbar=False)
plt.title('Confusion Matrix:')
plt.xlabel('prediction')
plt.ylabel('actual class')
plt.show()