In [None]:
# creating datasets
from sklearn.datasets import make_classification
X,y = make_classification(n_samples=1000, n_features=20, n_informative=15, n_redundant=5, random_state=7)

### Pre-Processing Techniques
1. Data Cleaning <br>
<t>    1.1 Missing Values Treatment <br>
<t>    1.2 Noise Smoothening <br>
<t>    1.3 Outlier Treatment 
2. Data Integration
3. Data Transformation
    3.1 Normalization
    3.2 Noise removal- Binning
4. Data Reduction

### Handling Null Values
1. Null Value Threshold for a column
2. Null Value treatment for remaining Null Values

In [2]:
import numpy as np 
import pandas as pd

# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate() 


22/07/22 01:27:23 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.1.11 instead (on interface wlp3s0)
22/07/22 01:27:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/07/22 01:27:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [46]:
# NA Thresholding for Columns and Rows

# python
def null_threshold(data, limit, ax):
    '''
    input: data -> pandas dataframe
            limit -> float [0.0(no null values allowed), 1.0(all null values allowed)] 
            ax -> int (0 for row based thresholding, 1 for column based)
    Output: pd.DataFrame
    '''
    if ax == 1:
        return (data.dropna(axis=ax, thresh=int(limit * len(data))))
    else :
        return (data.dropna(axis=ax, thresh=int(limit * len(data.columns))))


# pyspark
# add if else for axis
from pyspark.sql.functions import count, when, col
def null_threshold_pyspark(data, limit, ax):
    '''
    input: data -> spark dataframe
            limit -> float [0.0(no null values allowed), 1.0(all null values allowed)] 
            ax -> int (0 for row based thresholding, 1 for column based)
    Output: Spark DataFrame
    '''
    if ax == 1:
        aggregated_row = data.select([(count(when(col(c).isNull(), c))/data.count()).alias(c) for c in data.columns]).collect()
        aggregated_dict_list = [row.asDict() for row in aggregated_row]
        aggregated_dict = aggregated_dict_list[0] 
        col_null_g_limit_p=list({i for i in aggregated_dict if aggregated_dict[i] >= (1-limit)})
        temp = data.drop(*col_null_g_limit_p)
        return (temp)
    else :
        return (data.dropna(thresh= int(limit * len(data.columns))))


# corner case: print(null_threshold(df, 0.7, 1)) null_threshold_pyspark(df_ps, 0.7, 1).show()

In [10]:
# Dropping rows if missing values are present in specific columns

# python
def null_unique_columns(data, unique_cols):
    '''
    input: data -> pd.DataFrame
            unique_cols -> list of column(s) that should be unique
    output: pd.DataFrame
    '''
    return (data.dropna(subset=unique_cols))

#pyspark
from functools import reduce
def null_unique_columns_pyspark(data, unique_cols):
    '''
    input: data -> Spark DataFrame
            unique_cols -> list of column(s) that should be unique
    output: Spark DataFrame
    '''
    return (data.na.drop(subset=unique_cols))


### Imputation
1. Statistical Imputation <br>
<t> 1.1 Mean, mode, median strategy
<t> 1.2 Constant Strategy


In [11]:
# Fill NA

# python
def fill_na(data):
    '''
    Description: Fills na of quantative columns with median and qualitative columns with mode
    input: data -> pd.DataFrame
    output: pd.DataFrame
    '''
    # looking for best method for each column
    for i in data.columns:
        if (data[i].dtype in ['object', 'bool']):
            # mode by default doesn't consider NaN for mode()
            data[i].fillna(data[i].mode()[0], inplace=True)
        else:
            data[i].fillna(data[i].median()[0], inplace=True)
    return data

# pyspark
def fill_na_pyspark(data):
    '''
    Description: Fills na of quantative columns with median and qualitative columns with mode
    input: data -> spark dataframe
    output: spark dataframe
    '''
    for i in data.dtypes:
        if (data.i[1] in ['string']):
            # calculating mode
            mode_temp = data.groupBy(i[0]).count().orderBy("count", ascending=False).first()[0]
            return (data.na.fill(value=mode_temp, subset=[i[0]]))
        elif (data.i[1] == 'int'):
            # calculating median
            median_temp = data.approxQuantile(i[0], [0.5], 0.25)
            return (data.na.fill(value=median_temp, subset=[i[0]]))

In [None]:
# possible alternative

# python
from os import XATTR_REPLACE
from numpy import isnan
from sklearn.impute import SimpleImputer
def fill_na_imp (X, method):
    ''' 
    Input: X (pd.DataFrame), method = ['mean', 'median', 'most_frequent', 'constant']
    '''
    imputer = SimpleImputer(strategy=method)
    imputer.fit(X)
    return (imputer.transform(X))


# pyspark

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Age of Employee', 'Experience (in years)', 'Salary (per month - $)'],
    outputCols = ["{}_imputed".format(a) for a in ['Age of Employee', 'Experience (in years)', 'Salary (per month - $)']]
).setStrategy("mean")

In [None]:
# KNNImputer

# python

# ques: more methods
from sklearn.impute import KNNImputer
def KNN_imputer(X, k, method):
    ''' 
    Input: np matrix, make sure all null values marked as NaN
    method = ['nan_euclidean']

    '''
    imputer = KNNImputer(n_neighbors=k, weights='uniform', metric=method)
    imputer.fit(X)
    Xtrans = imputer.transform(X)
    return (Xtrans)

# pyspark


In [16]:
df = pd.read_csv('test.csv')
df_ps = spark.read.csv('test.csv', header=True, inferSchema=True)

In [None]:
# variance thresholding

# Ques. range of variance limit in both cases

# python
from sklearn.feature_selection import VarianceThreshold
def variance_thresholding(data, limit):
    '''
    Input: pd.DataFrame, limit 0-> all features, 1 -> no features
    Ouput: X (numpy matrix), y (numpy matrix)
    '''
    df = data.values
    X = data[:, :-1]
    y = data[:, -1]
    transform = VarianceThreshold(threshold=limit)
    X_sel = transform.fit_transform(X)
    return (X_sel, y)

# pyspark (check code)
from pyspark.ml.feature import VarianceThresholdSelector
def variance_thresholding_pyspark(data, limit):
    '''
    Input: Spark Dataframe, limit (int) 
    Ouput: 
    '''
    selector = VarianceThresholdSelector(varianceThreshold=limit, outputCol='selected_features')
    model = selector.fit(data)
    return (model)

In [None]:
# Duplicate Data
# questions: subsets?

# python
def drop_duplicates(data):
    '''
    Input: pd.DataFrame
    output: pd.DataFrame
    '''
    return (data.drop_duplicates)

# pyspark
def drop_duplicates_pyspark(data):
    '''
    '''
    return ()

### Changing Column Type

In [None]:
# to numerical

# python

# pyspark
from pyspark.sql import functions as f 
from pyspark.sql.types import IntegerType
def to_numerical_pyspark (data, cols):
    ''' 
    '''
    for column in cols:
        data = data.withColumn(column, f.col(column).cast(IntegerType))
    return (data)

# list of numerical columns

# python

# pyspark
def numerical_cols_pyspark (data):
    numeric_cols = [column[0] for column in df.dtypes if column[1] == 'int']
    return (numeric_cols)



In [None]:

# to datetime

# encoding

### Outlier Treatment

In [None]:
# 

# python
def outlier_threshold (data, method, limit):
    ''' 
    Input: pd.DataFrame, method = standard_deviation, inter_quartile, limit
    '''
    return ()




In [None]:
# pyspark

def find_outliers_pyspark(df):
    ''' 
    Input: Spark DF
    OutputL Spark DF with a new column with number of outliers 'total_outliers' in the record
    '''
    # Identifying the numerical columns in a spark dataframe
    numeric_columns = numerical_cols_pyspark(df)

    # Using the `for` loop to create new columns by identifying the outliers for each feature
    for column in numeric_columns:

        less_Q1 = 'less_Q1_{}'.format(column)
        more_Q3 = 'more_Q3_{}'.format(column)
        Q1 = 'Q1_{}'.format(column)
        Q3 = 'Q3_{}'.format(column)

        # Q1 : First Quartile ., Q3 : Third Quartile
        Q1 = df.approxQuantile(column,[0.25],relativeError=0)
        Q3 = df.approxQuantile(column,[0.75],relativeError=0)
        
        # IQR : Inter Quantile Range
        # We need to define the index [0], as Q1 & Q3 are a set of lists., to perform a mathematical operation
        # Q1 & Q3 are defined seperately so as to have a clear indication on First Quantile & 3rd Quantile
        IQR = Q3[0] - Q1[0]
        
        #selecting the data, with -1.5*IQR to + 1.5*IQR., where param = 1.5 default value
        less_Q1 =  Q1[0] - 1.5*IQR
        more_Q3 =  Q3[0] + 1.5*IQR
        
        isOutlierCol = 'is_outlier_{}'.format(column)
        
        df = df.withColumn(isOutlierCol,f.when((df[column] > more_Q3) | (df[column] < less_Q1), 1).otherwise(0))
    

    # Selecting the specific columns which we have added above, to check if there are any outliers
    selected_columns = [column for column in df.columns if column.startswith("is_outlier")]

    # Adding all the outlier columns into a new colum "total_outliers", to see the total number of outliers
    df = df.withColumn('total_outliers',sum(df[column] for column in selected_columns))

    # Dropping the extra columns created above, just to create nice dataframe., without extra columns
    df = df.drop(*[column for column in df.columns if column.startswith("is_outlier")])

    return df

def outlier_threshold_pyspark (df, limit):
    ''' 
    '''
    df = find_outliers_pyspark(df)
    len_numeric = len(numerical_cols_pyspark(df))
    return (df.filter(df['total_outliers'] < limit * len_numeric))

In [None]:
# automatic outlier treatment

# python
from sklearn.model_selection import train_test_split
from sklearn.neighbors import LocalOutlierFactor
def automatic_outlier (X_train, y_train):
    ''' 
    Input: numpy matrix of training set
    '''
    lof = LocalOutlierFactor()
    yhat = lof.fit_predict(X_train)
    mask = yhat != -1
    X_train, y_train = X_train[mask, :], y_train[mask]
    return (X_train, y_train)


    

### Feature Engineering

### Scaling