In [1]:
import seaborn as sns
from scipy import stats
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from imblearn import FunctionSampler
import numpy as np
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.preprocessing import OneHotEncoder 


In [1]:
def show_hist_qq(df,x,hist_title=""):
    '''
    Plots a univariate histogram and Q-Q plot
        
    string: DataFrame which contains the variable to be plot as a column
    x: Name of column to plot
    title: Title of Histogram
    return: None.
    '''
    fig,axes=plt.subplots(1,2)
    fig.set_size_inches(18, 5)
    _=sns.histplot(data=df, x=x, kde=True, ax=axes[0]).set(title=hist_title)
    _=stats.probplot(df[x],plot=axes[1])
    print(f"Skew: {df[x].skew():.2f}")

In [None]:
def log_and_return(df,col,drop=False):
    '''
    Apply natural numpy.log1p to a column in a DataFrame and returns the whole Dataframe
    df: Dataframe
    col: Name of column to log
    drop: Boolean, to drop original column
    return: Dataframe with logged column
    '''
    eps=0.001
    df[f'log_{col}']=np.log1p(df[col]+eps)
    if drop:
        df=df.drop(columns=col)
    return df

In [None]:
def correlation_map(df,title="",size=(18,18)):
    '''
    Show a correlation heatmap
    df: Dataframe to perform correlation
    title: Title of heatmap
    size: Size of chart, default (18,18)
    '''
    plt.rcParams['figure.figsize']=(18,18)
    _=sns.heatmap(df.corr(),square=True).set(title=title)

In [None]:
def single_correlation_map(df,col,size=(1,10),corr_threshold=0.5):
    '''
    Show a correlation map for a single variable
    df: Dataframe to correlate
    col: name of column to show
    size: Size of chart, default (1,10)
    corr_threshold: min level of +/- correlation to include in plot
    '''
    plt.rcParams['figure.figsize']=size
    corrmap=df.corr()[[col]].sort_values(ascending=False,by=col).iloc[1:,:]
    
    corrmap=corrmap[corrmap[col].abs()>=corr_threshold]

    _=sns.heatmap(corrmap,annot=True,vmin=-1,vmax=1).set(title=f"Correlation for {col}")

In [None]:
def ordinal_to_rank(df):
    var='Lot Shape'
    df[var]=df[var].map({"IR3":1,"IR2":2,"IR1":3,"Reg":4})

    var='Utilities'
    df[var]=df[var].map({"ELO":1,"NoSeWa":2,"NoSewr":3,"AllPub":4})

    var='Land Slope'
    df[var]=df[var].map({"Sev":1,"Mod":2,"Gtl":3})

    var='Exter Qual'
    df[var]=df[var].map({"Po":1,"Fa":2,"TA":3,"Gd":4,"Ex":5})

    var='Exter Cond'
    df[var]=df[var].map({"Po":1,"Fa":2,"TA":3,"Gd":4,"Ex":5})

    var='Bsmt Qual'
    df[var]=df[var].map({"None":0,"Po":1,"Fa":2,"TA":3,"Gd":4,"Ex":5})

    var='Bsmt Cond'
    df[var]=df[var].map({"None":0,"Po":1,"Fa":2,"TA":3,"Gd":4,"Ex":5})

    var='Bsmt Exposure'
    df[var]=df[var].map({"None":0,"No":1,"Mn":2,"Av":3,"Gd":4})

    var='BsmtFin Type 1'
    df[var]=df[var].map({"None":0,"Unf":1,"LwQ":2,"Rec":3,"BLQ":4,"ALQ":5,"GLQ":6})

    var='BsmtFin Type 2'
    df[var]=df[var].map({"None":0,"Unf":1,"LwQ":2,"Rec":3,"BLQ":4,"ALQ":5,"GLQ":6})

    var='Heating QC'
    df[var]=df[var].map({"Po":1,"Fa":2,"TA":3,"Gd":4,"Ex":5})

    var='Electrical'
    df[var]=df[var].map({"Mix":1,"FuseP":2,"FuseF":3,"FuseA":4,"SBrkr":5})

    var='Kitchen Qual'
    df[var]=df[var].map({"Po":1,"Fa":2,"TA":3,"Gd":4,"Ex":5})

    var='Functional'
    df[var]=df[var].map({"Sal":1,"Sev":2,"Maj2":3,"Maj1":4,"Mod":5,"Min2":6,
                        "Min1":7,"Typ":8})

    var='Fireplace Qu'
    df[var]=df[var].map({"None":0,"Po":1,"Fa":2,"TA":3,"Gd":4,"Ex":5})

    var='Garage Finish'
    df[var]=df[var].map({"None":0,"Unf":1,"RFn":2,"Fin":3})

    var='Garage Qual'
    df[var]=df[var].map({"None":0,"Po":1,"Fa":2,"TA":3,"Gd":4,"Ex":5})

    var='Garage Cond'
    df[var]=df[var].map({"None":0,"Po":1,"Fa":2,"TA":3,"Gd":4,"Ex":5})

    var='Paved Drive'
    df[var]=df[var].map({"N":1,"P":2,"Y":3})

    var='Pool QC'
    df[var]=df[var].map({"None":0,"Fa":1,"TA":2,"Gd":3,"Ex":4})

    var='Fence'
    df[var]=df[var].map({"None":0,"MnWw":1,"GdWo":2,"MnPrv":3,"GdPrv":4})

    var='Central Air'
    df[var]=df[var].map({"N":0,"Y":1})
    
    return df

In [None]:
#Custom Transformer that extracts columns passed as argument to its constructor 
class FeatureSelector(BaseEstimator, TransformerMixin):
    #Class Constructor 
    def __init__(self, feature_names=None,by=None):
        '''
        feature_names: list of features to pick for. If feautres_names is not None, 'by' will be ignored.
        by: 'categorical' or 'numerical'. The type of features to pick out for. Only used if feature_names is None.
        '''
        self.feature_names = feature_names
        self.by=by
        return None
    
    #Return self nothing else to do here    
    def fit(self, X, y = None):
        return self 
    
    #Method that describes what we need this transformer to do
    def transform(self, X, y = None):
        if self.feature_names is not None:
#             print(type(X))
            return X[self.feature_names] 
        else:
            if self.by=="categorical":
#                 print(type(X))
                return X.select_dtypes(exclude='number')
            elif self.by=="numerical":
#                 print(type(X))
                return X.select_dtypes(include='number')
            else:
                print(f"Error: Expected 'categorical' or 'numerical', but got ''{self.by}'.")
                return None

In [None]:
#Custom Transformer that imputes with None or 0
class StandardImpute(BaseEstimator, TransformerMixin):
    #Class Constructor 
    def __init__(self, none=[], zero=[]):
        '''
        zero:list of cols to impute to 0
        none: list of cols to impure to None
        '''
        self.zero = zero
        self.none = none
    
    #Return self nothing else to do here    
    def fit(self, X, y = None):
        return self 
    
    #Method that describes what we need this transformer to do
    def transform(self, X, y = None):
        X_cols=list(X.columns)
        
        for i in [z for z in self.none if z in X_cols]:
            X[i]=X[i].fillna("None")
            
        for j in [z for z in self.zero if z in X_cols]:
            X[j]=X[j].fillna(0)
        
#         display(X.head())
        return X 

In [None]:
#Custom Transformer that imputes Lot Frontage
class LotFrontageImpute(BaseEstimator, TransformerMixin):
    #Class Constructor 
    def __init__(self):
        return None
    
    #Return self nothing else to do here    
    def fit(self, X, y = None):
        return self 
    
    #Method that describes what we need this transformer to do
    def transform(self, X, y = None):
        X['Lot Frontage']=X.groupby(by="Neighborhood")['Lot Frontage'].transform(
            lambda z: z.fillna(z.median() if not np.isnan(z.median()) else df['Lot Frontage'].median()))
        
#         print(type(X))
        return X

In [None]:
#Custom Transformer that transforms ordinal features to numerica
class OrdinalToNumeric(BaseEstimator, TransformerMixin):
    #Class Constructor 
    def __init__(self):
        return None
    
    #Return self nothing else to do here    
    def fit(self, X, y = None):
        return self 
    
    #Method that describes what we need this transformer to do
    def transform(self, X, y = None):
        X=ordinal_to_rank(X)
        
        
        return X

In [None]:
categorical_pipeline = Pipeline(steps = [('categorical_selector', FeatureSelector(by="categorical")),
#                                          ('cat_transformer', CategoricalTransformer()),
                                         ('one_hot_encoder', OneHotEncoder(sparse=False, drop="first"))
                                        ])

numerical_pipeline = Pipeline(steps = [('numerical_selector', FeatureSelector(by="numerical")),
#                                          ('cat_transformer', CategoricalTransformer()),
                                        ])

In [None]:
Categorical_Numerical_Split=FeatureUnion(transformer_list=[
    ('categorical_pipeline',categorical_pipeline),
    ('numerical_pipeline',numerical_pipeline)
])

In [None]:
def onehot_encode_categorical_features(df):
    #look at categorical columns
    df_c=df.select_dtypes(exclude='number')

    #look at numerical columns
    df_n=df.select_dtypes(include='number')

    df_c=pd.get_dummies(df_c,drop_first=True)

    return pd.concat([df_n,df_c],axis=1)

In [None]:
#Custom Transformer that one hot encode categorical features
class OneHotEncode(BaseEstimator, TransformerMixin):
    #Class Constructor 
    def __init__(self):
        return None
    
    #Return self nothing else to do here    
    def fit(self, X, y = None):
        return self 
    
    #Method that describes what we need this transformer to do
    def transform(self, X, y = None):
        
        #look at categorical columns
        df_c=X.select_dtypes(exclude='number')

        #look at numerical columns
        df_n=X.select_dtypes(include='number')

        df_c=pd.get_dummies(df_c,drop_first=True)

        df_new=pd.concat([df_n,df_c],axis=1)
        
#         display(df_new.head())
#         display(df_new.info(verbose=True))

        return df_new

In [None]:
#Custom Transformer that displays the output at this point
class DispStep(BaseEstimator, TransformerMixin):
    #Class Constructor 
    def __init__(self):
        return None
    
    #Return self nothing else to do here    
    def fit(self, X, y = None):
        return self 
    
    #Method that describes what we need this transformer to do
    def transform(self, X, y = None):
#         display(X.head())
        return X

In [None]:
class Tester():
    '''
    This class is to help with reading and submission of Kaggle test file
    '''
    def __init__(self,path):
        '''
        Instantiates the Tester object, read from test file, and save ID sequence
        '''
        self.df=pd.read_csv(path)
        self.df_id=self.df[['Id']]
    
    def get_test_df(self):
        '''
        return: Test DataFrame
        '''
        return self.df
    
    def get_length(self):
        '''
        return: Lenth of test file
        '''
        return self.df_id.shape[0]
    
    def write_submission(self,pred,output):
        '''
        Takes a prediction and output path, and writes a Kaggle format csv to file
        pred: list, whose length must be equal to length of test file
        output: output path of .csv (must end with ".csv")
        '''
        
        #check if length of pred is correct
        if len(pred) != self.get_length():
            print("Length mismatch")
            return 1
        
        #merge the ID and predicted SalePrice
        self.df_result=pd.concat([self.df_id,pd.DataFrame(pred)],axis=1)

        #rename the columns
        self.df_result.rename(columns={self.df_result.columns[1]: "SalePrice"}, inplace = True)
        
        #save to csv
        self.df_result.to_csv(output,index=False)
        
        print(f"Output saved to {output}")