This class represents a dataset of training samples. It provides methods to gather samples from a specified folder, convert the data into tensors, and calculate R-squared scores for model predictions. It also provides functions to perform training and testing of models using the gathered samples.


In [13]:
import import_ipynb
import os
from scipy.stats import pearsonr
import numpy as np
import pandas as pd
import torch
import pickle
from sklearn.model_selection import train_test_split
import model_creator
from training_sample import training_sample
import locations
import torch.nn as nn
from sklearn.preprocessing import StandardScaler

class training_dataset:
    def __init__(self, dataset_folder):
        # Initializes the training_dataset object by gathering samples from the specified dataset folder.        
        self.samples = self.gather_samples(dataset_folder)


    def gather_samples(self, dataset_folder):
        # Gathers all training samples from the specified folder.
        # Filters files in the folder and creates training_sample objects for each file.
        all_items = os.listdir(dataset_folder)
        files = [item for item in all_items if os.path.isfile(os.path.join(dataset_folder, item))]
        samples = []
        for i in files:
            sample = training_sample(os.path.join(dataset_folder, i))
            samples.append(sample)

        return samples

    def return_as_tensors_split(self, feature_columns=['wavelength', 'psi65', 'del65', 'psi70', 'del70', 'psi75', 'del75'], target_columns = ['T']):
        # Converts the dataset into tensors and splits it into training and testing sets.
        # Uses specified feature and target columns for the split.
        df = pd.DataFrame()

        for sample in self.samples:
            df.concat(sample.data[feature_columns], ignore_index=True)

        features = df[feature_columns]
        targets = df[target_columns]

        x_train, x_test, y_train, y_test = train_test_split(features, targets, test_size=0.2, random_state=42)

        x_train = torch.from_numpy(x_train.to_numpy(dtype=np.float32))
        x_test = torch.from_numpy(x_test.to_numpy(dtype=np.float32))
        y_train = torch.from_numpy(y_train.to_numpy(dtype=np.float32))
        y_test = torch.from_numpy(y_test.to_numpy(dtype=np.float32))

        return [x_train, y_train, x_test, y_test]

    def return_as_tensors(self, columns=['wavelength', 'psi65', 'del65', 'psi70', 'del70', 'psi75', 'del75'], target_columns = ['T']):
        # Converts the entire dataset into tensors without splitting.
        # Uses specified feature and target columns for the conversion.
        df = pd.DataFrame()

        for sample in self.samples:
            df = pd.concat([df, sample.data[columns + target_columns]], ignore_index=True)

        df.columns = columns + target_columns
        features = df[columns]
        targets = df[target_columns]
        features = torch.from_numpy(features.to_numpy(dtype=np.float32))
        targets = torch.from_numpy(targets.to_numpy(dtype=np.float32))
        return [features, targets]


    def get_total_r2_score(self, model, features = ['wavelength', 'psi65', 'del65', 'psi70', 'del70', 'psi75', 'del75'], targets = ['T']):
        # Calculates the total R-squared score for the model's predictions on the entire dataset.
        model = model_creator.MLP.create_and_load(model, input_size=len(features), output_size=len(targets))
        model.eval()
        data = self.return_as_tensors(features, targets)
        features = data[0]
        targets = data[1]
        with torch.no_grad():
            predictions = model(features)
            predictions = predictions.flatten().tolist()

        pearson = pearsonr(predictions, targets.flatten().tolist())
        r2_score = pearson[0] ** 2
        return float(r2_score)

    def get_median_r2_score(self, model, features = ['wavelength', 'psi65', 'del65', 'psi70', 'del70', 'psi75', 'del75'], targets = ['T']):
        # Calculates the median R-squared score for the model's predictions.
        # Uses median values of predictions for each file.
        
        medians = []
        data = []
        

        for sample in self.samples:
            medians.append(sample.predict_median(model, features, len(targets)))
            dataDummy = sample.data[targets] 
            dataDummy = dataDummy.iloc[:1].reset_index(drop=True)
            dataDummy = dataDummy.astype(float).values.tolist()
            dataDummy = dataDummy[0][0]
            data.append(dataDummy)            

        pearson = pearsonr(medians, data)
        r2_score = pearson[0] ** 2
        
        
        return float(r2_score)

    def get_mean_r2_score(self, model, features = ['wavelength', 'psi65', 'del65', 'psi70', 'del70', 'psi75', 'del75'], targets = ['T']):
        # Calculates the median R-squared score for the model's predictions.
        # Uses mean values of predictions for each file.
        means = []
        data = []
        

        for sample in self.samples:
            means.append(sample.predict_mean(model, features, len(targets)))
            dataDummy = sample.data[targets] 
            dataDummy = dataDummy.iloc[:1].reset_index(drop=True)
            dataDummy = dataDummy.astype(float).values.tolist()
            dataDummy = dataDummy[0][0]
            data.append(dataDummy)            
        print (means)
        print (data)
        pearson = pearsonr(means, data)
        r2_score = pearson[0] ** 2


        def train_flattened(self,model_name = "default", feature_columns=['wavelength', 'psi65', 'del65', 'psi70', 'del70', 'psi75', 'del75'], target_columns = ['T'], hidden_layers = [256, 128, 64, 32], loss = nn.MSELoss(), save_folder = "models"):

            data = self.return_as_flat_tensors(feature_columns, target_columns)
    
            if model_name == "default":
                code_layers = ""
                for layer in hidden_layers:
                    code_layers += str(layer) + "_"
                code_layers = code_layers[:-1]
                model_name = "model" + str(target_columns) + "_" + str(code_layers) + ".pth"
    
            model = model_creator.MLP(input_size=497, output_size=4, hidden_layers=hidden_layers)
            optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
            save_path = os.path.join(save_folder, model_name)
            os.makedirs(save_folder, exist_ok=True)
            training_class.train_model(model, loss, optimizer, data[0], data[2], data[1], data[3], save_path=save_path, batch_size=0)


    def train_standardized_flattened(self,model_name = "defaultStandarized", feature_columns=['wavelength', 'psi65', 'del65', 'psi70', 'del70', 'psi75', 'del75'], target_columns = ['T'], hidden_layers = [512, 256, 128, 64], loss = nn.MSELoss(), save_folder = "models"):
        data = self.return_as_flat_standardized_tensors(feature_columns, target_columns)

        if model_name == "defaultStandarized":
            code_layers = ""
            for layer in hidden_layers:
                code_layers += str(layer) + "_"
            code_layers = code_layers[:-1]
            model_name = "modelStanderd" + str(target_columns) + "_" + str(code_layers) + ".pth"

        model = model_creator.MLP(input_size=497, output_size=1, hidden_layers=hidden_layers)
        optimizer = torch.optim.Adam(model.parameters(), lr=0.00005, weight_decay=0.001)
        save_path = os.path.join(save_folder, model_name)
        os.makedirs(save_folder, exist_ok=True)
        training_class.train_model(model, loss, optimizer, data[0], data[2], data[1], data[3], save_path=save_path, batch_size=0)


    def return_as_flat_standardized_tensors(self, feature_columns=['wavelength', 'psi65', 'del65', 'psi70', 'del70', 'psi75', 'del75'], target_columns = ['T'], scalerName = "standardScaler"):
        targetDf = pd.DataFrame()
        featureDf = pd.DataFrame()

        for sample in self.samples:
            features, targets = sample.return_as_flat_df(feature_columns, target_columns)
            featureDf = pd.concat([featureDf, features], ignore_index=True)
            targetDf = pd.concat([targetDf, targets], ignore_index=True)

        x_scaler = StandardScaler()
        y_scaler = StandardScaler()



        x_scaled = x_scaler.fit_transform(featureDf)
        y_scaled = y_scaler.fit_transform(targetDf)


        with open(scalerName + "Y" + ".pkl", "wb") as f:
            pickle.dump(y_scaler, f)

        with open(scalerName + "X" + ".pkl", "wb") as f:
            pickle.dump(x_scaler, f)



        x_train, x_test, y_train, y_test = train_test_split(x_scaled, y_scaled, test_size=0.2, random_state=42)
        x_train = torch.from_numpy(x_train.astype(np.float32))
        x_test = torch.from_numpy(x_test.astype(np.float32))
        y_train = torch.from_numpy(y_train.astype(np.float32))
        y_test = torch.from_numpy(y_test.astype(np.float32))

        return [x_train, x_test, y_train, y_test]



    def train(self, model_name = "default", feature_columns=['wavelength', 'psi65', 'del65', 'psi70', 'del70', 'psi75', 'del75'], target_columns = ['T'], hidden_layers = [64, 32, 32, 16], loss = nn.MSELoss(), save_folder = "models"):

        data = self.return_as_tensors_split(feature_columns, target_columns)

        if model_name == "default":
            code_layers = ""
            for layer in hidden_layers:
                code_layers += str(layer) + "_"
            code_layers = code_layers[:-1]
            model_name = "model" + str(target_columns) + "_" + str(code_layers) + ".pth"

        model = model_creator.MLP(input_size=497, output_size=4, hidden_layers=hidden_layers)
        optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
        save_path = os.path.join(save_folder, model_name)
        os.makedirs(save_folder, exist_ok=True)
        training_class.train_model(model, loss, optimizer, data[0], data[2], data[1], data[3], save_path=save_path, batch_size=0)


    def test_r2_flattened(self, x_Scaler_name, y_Scaler_name, model_name = "default", feature_columns=['wavelength', 'psi65', 'del65', 'psi70', 'del70', 'psi75', 'del75'], target_columns = ['T', 'A', 'B', 'C']):

        model = model_creator.MLP.create_and_load(model_name, input_size=497, output_size=4)
        model.eval()
        # with open(x_Scaler_name + ".pkl", "rb") as f:
        #     x_scaler = pickle.load(f)
        #
        # with open(y_Scaler_name + ".pkl", "rb") as f:
        #     y_scaler = pickle.load(f)

        data = self.return_as_flat_standardized_tensors(feature_columns, target_columns)
        features_tr = data[0]
        features_te = data[1]
        targets_tr = data[2]
        targets_te = data[3]

        with torch.no_grad():
            pred_tr = model(features_tr)
            pred_te = model(features_te)

        pred_tr = [pred_tr[:,i] for i in range(pred_tr.size(1))]
        pred_te = [pred_te[:,i] for i in range(pred_te.size(1))]
        targets_tr = [targets_tr[:,i] for i in range(targets_tr.size(1))]
        targets_te = [targets_te[:,i] for i in range(targets_te.size(1))]
        T_pred_tr = pred_tr[0].tolist()
        A_pred_tr = pred_tr[1].tolist()
        B_pred_tr = pred_tr[2].tolist()
        C_pred_tr = pred_tr[3].tolist()

        T_pred_te = pred_te[0].tolist()
        A_pred_te = pred_te[1].tolist()
        B_pred_te = pred_te[2].tolist()
        C_pred_te = pred_te[3].tolist()

        T_target_tr = targets_tr[0].tolist()
        A_target_tr = targets_tr[1].tolist()
        B_target_tr = targets_tr[2].tolist()
        C_target_tr = targets_tr[3].tolist()

        T_target_te = targets_te[0].tolist()
        A_target_te = targets_te[1].tolist()
        B_target_te = targets_te[2].tolist()
        C_target_te = targets_te[3].tolist()

        T_tr_r2 = pearsonr(T_pred_tr, T_target_tr)[0] ** 2
        A_tr_r2 = pearsonr(A_pred_tr, A_target_tr)[0] ** 2
        B_tr_r2 = pearsonr(B_pred_tr, B_target_tr)[0] ** 2
        C_tr_r2 = pearsonr(C_pred_tr, C_target_tr)[0] ** 2

        T_te_r2 = pearsonr(T_pred_te, T_target_te)[0] ** 2
        A_te_r2 = pearsonr(A_pred_te, A_target_te)[0] ** 2
        B_te_r2 = pearsonr(B_pred_te, B_target_te)[0] ** 2
        C_te_r2 = pearsonr(C_pred_te, C_target_te)[0] ** 2

        print("T Train R2: ", T_tr_r2)
        print("A Train R2: ", A_tr_r2)
        print("B Train R2: ", B_tr_r2)
        print("C Train R2: ", C_tr_r2)

        print("T Test R2: ", T_te_r2)
        print("A Test R2: ", A_te_r2)
        print("B Test R2: ", B_te_r2)
        print("C Test R2: ", C_te_r2)





T Train R2:  0.9993265501816238
A Train R2:  0.9751036437304422
B Train R2:  0.9660701704860761
C Train R2:  0.4921649139042205
T Test R2:  0.9992700699598391
A Test R2:  0.9550196826709711
B Test R2:  0.9463011802250868
C Test R2:  0.0011005490648434936


'\nfolder = os.getcwd()\nparent_folder = os.path.dirname(folder)\nfolder_path = os.path.join(parent_folder,"code_data_models","datasets", "new_Si_jaw_delta", "")\nmodels_dir = locations.models_dir\ndataset = training_dataset(folder_path)\nmodeldir = os.path.join(models_dir, "modelA_64_32_32_16.pth")\nprint(modeldir)\nprint(dataset.get_total_r2_score(modeldir, [\'wavelength\', \'psi65\', \'del65\', \'psi70\', \'del70\', \'psi75\', \'del75\'], [\'A\']))\nprint(dataset.get_median_r2_score(modeldir, [\'wavelength\', \'psi65\', \'del65\', \'psi70\', \'del70\', \'psi75\', \'del75\'], [\'A\']))\nprint(dataset.get_mean_r2_score(modeldir, [\'wavelength\', \'psi65\', \'del65\', \'psi70\', \'del70\', \'psi75\', \'del75\'], [\'A\']))\n'