In [1]:
#pip install seaborn --user

Note: you may need to restart the kernel to use updated packages.


In [4]:
import pandas as pd
import os
from os.path import dirname, abspath
import numpy as np
import sklearn
import warnings
import matplotlib.pyplot as plt
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler
from matplotlib import pyplot as plt
import seaborn as sns
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import chi2
from sklearn.compose import make_column_transformer

In [5]:
from config import config_variables
print(config_variables)

{'PIPELINE_SWITCH': {'PIPELINE_TEST': False}, 'FEATURE_SET_VARS': {'IMPUTATION': 'Linear', 'MIN_THRESHOLD': 5}, 'FOLDERS': {'DATA': 'data', 'RAW_FEATURESET': 'raw_featureset', 'TEMPORAL': 'temporal_data', 'POST_PROCESSED_TRAIN_SET': 'post_processed_featureset', 'POST_PROCESSED_TEST_SET': 'post_processed_testset'}, 'FEATURE_SET': {'RAW_FEATURESET_EXCEL': 'Datos.xlsx', 'SPLIT_LABEL_NAME': 'HeartDisease', 'FEATURESET_EXCEL': 'Data_featureset.xlsx', 'LABELS_EXCEL': 'Data_labelset.xlsx', 'FEATURES_TRAIN': 'Train_featureset.xlsx', 'FEATURES_TEST': 'Test_featureset.xlsx', 'LABELS_TRAIN': 'Train_labelset.xlsx', 'LABELS_TEST': 'Test_labelset.xlsx', 'POST_PROCESSED_TRAIN_EXCEL': 'Post_processed_train_featureset.xlsx', 'POST_PROCESSED_TEST_EXCEL': 'Post_processed_test_featureset.xlsx', 'POSTPROCESSING_STEPS': {'STANDARDIZATION': True, 'NORMALIZATION': False, 'ONE_HOT_ENCODING': True}}}


In [16]:
class HeartDataFile(object):
    def __init__(self, config_variables):
        self.config = config_variables
        self.base_path = (abspath(os.getcwd()))

        self.data_folder = config_variables['FOLDERS']['DATA']
        self.raw_data_folder = config_variables['FOLDERS']['RAW_FEATURESET']
        self.raw_excel_file = config_variables['FEATURE_SET']['RAW_FEATURESET_EXCEL']
        self.path_input_excel = os.path.join(self.base_path, self.data_folder, self.raw_data_folder, self.raw_excel_file)
        self.raw_heart_df = pd.read_excel(self.path_input_excel)

        self.temporal_data_folder = config_variables['FOLDERS']['TEMPORAL']
        self.label_name = config_variables['FEATURE_SET']['SPLIT_LABEL_NAME']
        self.target_name = config_variables['FEATURE_SET']['SPLIT_LABEL_NAME']


        self.feature_file = config_variables['FEATURE_SET']['FEATURESET_EXCEL']
        self.output_feature_file = os.path.join(self.base_path, self.data_folder, self.temporal_data_folder, self.feature_file)
        #self.feature = pd.read_excel(self.output_feature_file)
  
        self.label_file = config_variables['FEATURE_SET']['LABELS_EXCEL']
        self.output_label_file = os.path.join(self.base_path, self.data_folder, self.temporal_data_folder, self.label_file)
        #self.label = pd.read_excel(self.output_label_file)
        

        self.train_features = config_variables ['FEATURE_SET']['FEATURES_TRAIN']
        self.output_train_features = os.path.join(self.base_path, self.data_folder, self.temporal_data_folder, self.train_features)
        self.test_features = config_variables ['FEATURE_SET']['FEATURES_TEST']
        self.output_test_features = os.path.join(self.base_path, self.data_folder, self.temporal_data_folder, self.test_features)

        self.train_label = config_variables ['FEATURE_SET']['LABELS_TRAIN']
        self.output_train_label = os.path.join(self.base_path, self.data_folder, self.temporal_data_folder, self.train_label)
        self.test_label = config_variables ['FEATURE_SET']['LABELS_TEST']
        self.output_test_label = os.path.join(self.base_path, self.data_folder, self.temporal_data_folder, self.test_label)
        
    
        self.post_train_data_folder = config_variables['FOLDERS']['POST_PROCESSED_TRAIN_SET']
        self.post_test_data_folder = config_variables['FOLDERS']['POST_PROCESSED_TEST_SET']

        self.post_processed_train_set_excel_file = config_variables['FEATURE_SET']['POST_PROCESSED_TRAIN_EXCEL']
        self.post_processed_test_set_excel_file = config_variables['FEATURE_SET']['POST_PROCESSED_TEST_EXCEL']
        self.train_set_output_excel = os.path.join(self.base_path, self.data_folder, self.post_train_data_folder, self.post_processed_train_set_excel_file)
        self.test_set_output_excel = os.path.join(self.base_path, self.data_folder, self.post_test_data_folder, self.post_processed_test_set_excel_file)


    
    def split_labels_target(self, config_variables):

         #Check if folder exists
        if os.path.exists(os.path.join(self.base_path, self.data_folder, self.temporal_data_folder)) == False:
            #Create folder
            os.mkdir(os.path.join(self.base_path, self.data_folder, self.temporal_data_folder))
        else:
             print('Folder already exists')
        
        #Check if featureset file exists 
        if os.path.exists(os.path.join(self.base_path, self.data_folder, self.temporal_data_folder, self.output_feature_file)) == True:
            print('Feature set file already exists')
        
        else: 
            self.feature_set = self.raw_heart_df.loc[:, self.raw_heart_df.columns != self.label_name]
            self.feature_df = self.feature_set.to_excel(self.output_feature_file, index=False)

        #Check if label file exists 
        if os.path.exists(os.path.join(self.base_path, self.data_folder, self.temporal_data_folder, self.output_label_file)) == True:
            print('Label set file already exists')
        else: 
            self.label_set = self.raw_heart_df[self.label_name]
            self.label_df = self.label_set.to_excel(self.output_label_file, index=False)

        
    def split_train_test(self, config_variables):
        #This function is considering stratification 
        self.feature = pd.read_excel(self.output_feature_file)
        self.label = pd.read_excel(self.output_label_file)

        X = self.feature
        y = self.label

        #How to call these variables
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=100, stratify=y)
        

        if os.path.exists(os.path.join(self.base_path, self.data_folder, self.temporal_data_folder, self.output_train_features)) == True:
            print('Features trainset already exists')
        else:
            X_train.to_excel(self.output_train_features, index=False)
        
        if os.path.exists(os.path.join(self.base_path, self.data_folder, self.temporal_data_folder, self.output_test_features)) == True:
            print('Features testset already exists')    
        else:
            X_test.to_excel(self.output_test_features, index=False) 
        
        if os.path.exists(os.path.join(self.base_path, self.data_folder, self.temporal_data_folder, self.output_train_label)) == True:
            print('Label trainset already exists')
        else:
            y_train.to_excel(self.output_train_label, index=False)        
        
        if os.path.exists(os.path.join(self.base_path, self.data_folder, self.temporal_data_folder, self.output_test_label)) == True:
            print('Label testset already exists')
        else:
            y_test.to_excel(self.output_test_label, index=False) 
    


    def visualization_num(self, config_variables):
      
        #Im only reading the features of the training set 
        self.feature_heart_df = pd.read_excel(self.output_train_features)

        #Define numerical variables of dataframe 
        numeric = self.feature_heart_df.select_dtypes(include=[np.number])
        #Delete first column that is number of patients
        numeric_heart  = numeric.iloc[: , 1:]

        # Get seaborn plot 1
        plt.figure(figsize=(16, 6))
        heatmap = sns.heatmap(numeric_heart.corr(),  cmap = 'coolwarm', vmin=-1, vmax=1, annot=True)

        heatmap.set_title('Correlation plot', fontdict={'fontsize':12}, pad=12)

        plt.savefig('plot.png', bbox_inches='tight')
        plt.show()

        #Make boxplot for numerical features 
        red_circle = dict(markerfacecolor='red', marker='o', markeredgecolor='white')
        fig, axs = plt.subplots(1, len(numeric_heart.columns), figsize=(20,10))
        for i, ax in enumerate(axs.flat):
            ax.boxplot(numeric_heart.iloc[:,i], flierprops=red_circle)
            ax.set_title(numeric_heart.columns[i], fontsize=20, fontweight='bold')
            ax.tick_params(axis='y', labelsize=14)
            plt.tight_layout()

        
        #Save it in a specific path 
        plt.savefig('boxplot.png')

    def visualization_cat(self, config_variables):
      
        #Im only reading the features of the training set 
        self.feature_heart_df = pd.read_excel(self.output_train_features)


        categorical_heart = self.feature_heart_df.select_dtypes(exclude=[np.number])
        for categorical_columns in categorical_heart.columns.to_list():
            plt.figure()
            sns.countplot(y=categorical_columns, data=categorical_heart, palette="husl")
            plt.show()
              

    def standardization(self, train_df, test_df, integer_features):

        transformer_std = ColumnTransformer([("num", StandardScaler(), integer_features)]) 
        
        #Apply std to the train set 
        transformed_std_train = transformer_std.fit_transform(train_df)
        
        #Apply std to the test set 
        transformed_std_test = transformer_std.transform(test_df)
        
        std_train_df = pd.DataFrame(transformed_std_train, columns=transformer_std.get_feature_names_out())
        std_test_df = pd.DataFrame(transformed_std_test, columns=transformer_std.get_feature_names_out())

        return std_train_df, std_test_df

    
    def normalization(self, train_df, test_df, integer_features):    

        transformer_norm = ColumnTransformer([("num", MinMaxScaler(), integer_features)]) 
        transformed_norm = transformer_norm.fit_transform(df)
        norm_df = pd.DataFrame(transformed_norm, columns=transformer_norm.get_feature_names_out())
    
        return norm_df

    def one_hot_encoding(self, train_df, test_df, categorical_features):

        transformer_ohe = ColumnTransformer([("cat", OneHotEncoder(), categorical_features)])
        
        #Not sure about this line
        #transformer_train_ohe = ColumnTransformer([("cat", OneHotEncoder(), categorical_features)])
        
        transformed_ohe_train = transformer_ohe.fit_transform(train_df)
        transformed_ohe_test = transformer_ohe.transform(test_df)

        ohe_train_df = pd.DataFrame(transformed_ohe_train, columns=transformer_ohe.get_feature_names_out())
        ohe_test_df = pd.DataFrame(transformed_ohe_test, columns=transformer_ohe.get_feature_names_out())

        #print(ohe_test_df)
        return ohe_train_df, ohe_test_df

   
        #Univariate feature selection 
    def univariate_selection(self, ):
        X = data.iloc[:,0:18]  #independent columns
        y = data.iloc[:,19]    #target column i.e price range
        #apply SelectKBest class to extract top 10 best features
        bestfeatures = SelectKBest(score_func=chi2, k=10)
        fit = bestfeatures.fit(X,y)
        dfscores = pd.DataFrame(fit.scores_)
        dfcolumns = pd.DataFrame(X.columns)
        #concat two dataframes for better visualization 
        featureScores = pd.concat([dfcolumns,dfscores],axis=1)
        featureScores.columns = ['Specs','Score']  #naming the dataframe columns
        print(featureScores.nlargest(10,'Score'))  #print 10 best features
        
    def build_featureset_definitive(self, df):
        #Check if featureset exists
        if os.path.exists(self.train_set_output_excel) == False:
            #Check if folder exists
            if os.path.exists(os.path.join(self.base_path, self.data_folder, self.post_train_data_folder)) == False:
                #Create folder
                os.mkdir(os.path.join(self.base_path, self.data_folder, self.post_train_data_folder))
        
            #Check if featureset exists
        if os.path.exists(self.test_set_output_excel) == False:
            #Check if folder exists
            if os.path.exists(os.path.join(self.base_path, self.data_folder, self.post_test_data_folder)) == False:
                #Create folder
                os.mkdir(os.path.join(self.base_path, self.data_folder, self.post_test_data_folder))

            #Load input featureset
            train_df = pd.read_excel(self.output_train_features)
            test_df = pd.read_excel(self.output_test_features)
            
            integer_features = train_df.select_dtypes(exclude="object").columns
            #integer_test_features = test_df.select_dtypes(exclude="object").columns

            #print(integer_features)
            categorical_features = train_df.select_dtypes(include="object").columns
            #print(categorical_features)


            final_train_df = pd.DataFrame()
            final_test_df = pd.DataFrame()
            
            #Preprocessing steps
            if self.config['FEATURE_SET']['POSTPROCESSING_STEPS']['STANDARDIZATION']:
                std_train_df, std_test_df = self.standardization(train_df, test_df, integer_features)
                if len(final_train_df) == 0:
                    print(len(std_train_df))
                    print(len(std_test_df))
                    final_train_df = std_train_df
                    final_test_df = std_test_df
                else:
                    final_train_df = final_train_df.join(std_train_df)
                    final_test_df = final_test_df.join(std_test_df)

            if self.config['FEATURE_SET']['POSTPROCESSING_STEPS']['NORMALIZATION']:
                train_df, test_df = self.normalization(train_df, integer_features)

            if self.config['FEATURE_SET']['POSTPROCESSING_STEPS']['ONE_HOT_ENCODING']:
                ohe_train_df, ohe_test_df = self.one_hot_encoding(train_df, test_df, categorical_features)
                if len(final_train_df) == 0:
                    final_train_df = ohe_train_df
                    final_test_df = ohe_test_df
                else:
                    print(len(ohe_train_df))
                    print(len(ohe_test_df))
                    final_train_df = final_train_df.join(ohe_train_df)
                    final_test_df = final_test_df.join(ohe_test_df)
         
            #Save postprocessed featureset 
            
            final_train_df.to_excel(self.test_set_output_excel, index=False)
            final_test_df.to_excel(self.train_set_output_excel, index=False)

          
        else: 
            print('No need to build train and file featureset, file already exists')
            
    def classification_report(y_test, y_pred):
        
        report = metrics.classification_report(y_test, y_pred, output_dict=True)
        df_classification_report = pd.DataFrame(report).transpose()
        df_classification_report = df_classification_report.sort_values
        df_classification_report.to_excel("results.xlsx")
        return df_classification_report

In [17]:
heartdata = HeartDataFile(config_variables)


In [14]:
heartdata.split_labels_target(config_variables)

Folder already exists
Feature set file already exists
Label set file already exists


In [21]:
heartdata.split_train_test(config_variables)


Features trainset already exists
Features testset already exists
Label trainset already exists
Label testset already exists


In [None]:
heartdata.visualization_cat(config_variables)

In [None]:
heartdata.visualization_num(config_variables)

In [20]:
heartdata.build_featureset_definitive(config_variables)

No need to build featureset, file already exists
