# Milestone 3 - PySpark

<div style="font-size: 14px;">
By:

- Mohamed Ayman Mohamed Mohamed abo Tammaa
    - 52-20136
    - mohamed.abotammaa@student.guc.edu.eg
    - P02
    
</div>

## Objectives:
1. Loading the dataset (5%)
2. Perform some simple cleaning (30%)
    - Column renaming: 10%
    - Detect missing: 35%
    - Handle missing: 35%
    - Check missing : 20%
3. Perform some analysis on the dataset (30%)
4. Add new columns with feature engineering (15%)
5. Encode categorical columns (10%) 
6. Create a lookup table for encoding only (5%)
7. Saving Cleaned dataseta and lookup table (5%)
8. ***BONUS**: Saving the output into a postgres database (5%)

**Note that:** You may not need to run the spark containers since pyspark aleady
creates a mini server by default.

## Requirements:

### Part 0: Libraries & Setup

In [169]:
from pyspark.sql import functions as fn
from pyspark.sql import Window

In [170]:
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkContext

# spark : SparkSession = SparkSession.builder.appName("m3_spark").getOrCreate()
# sc : SparkContext = spark.sparkContext

In [171]:
data_dir = "../../Datasets/"
ORIGINAL_DATAFILE = "fintech_data_38_52_20136.parquet"

### Part 1: Loading the dataset:

Simply load the dataset from the parquet format given in the google drive above
- Load the dataset.
- Preview first 20 rows.
- How many partitions is this dataframe split into?
- Change partitions to be equal to the number of your logical cores

In [172]:
fintech_df_raw : DataFrame = spark.read.parquet(data_dir + ORIGINAL_DATAFILE)
fintech_df_raw.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|       Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceGI4Llx4YzFce...|             ARX L

Number of partitions originally:

In [173]:
print(f"The dataset is partioned into '''{fintech_df_raw.rdd.getNumPartitions()}''' partition(s)")

The dataset is partioned into '''1''' partition(s)


My logical cores:

In [174]:
import psutil

logical_cores = psutil.cpu_count(logical=True)
print(f"Number of logical cores im my pc: {logical_cores}")

Number of logical cores im my pc: 16


Repartitioning:

In [175]:
fintech_df_raw = fintech_df_raw.repartition(logical_cores)
print(f"The dataset is partioned into '''{fintech_df_raw.rdd.getNumPartitions()}''' partition(s)")

The dataset is partioned into '''16''' partition(s)


### Part 2: Cleaning

#### - Rename all columns (replacing a space with an underscore, and making it lowercase)

In [176]:
def clean_column_names(df : DataFrame) -> DataFrame:
    df_cpy = df
    for col in df.columns:
        df_cpy = df_cpy.withColumnRenamed(col, col.replace(" ", "_").lower())
    return df_cpy

fintech_df = clean_column_names(fintech_df_raw)
fintech_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- annual_inc_joint: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- avg_cur_bal: double (nullable = true)
 |-- tot_cur_bal: double (nullable = true)
 |-- loan_id: long (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- loan_amount: double (nullable = true)
 |-- state: string (nullable = true)
 |-- funded_amount: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- grade: long (nullable = true)
 |-- issue_date: string (nullable = true)
 |-- pymnt_plan: boolean (nullable = true)
 |-- type: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- description: string (nullable = t

#### - Detect missing
   - Create a function that takes in the df and returns any data structrue of your choice(df/dict,list,tuple,etc) which has the name of the column and percentage of missing entries from the whole dataset.
   - Tip : storing the missing info as dict where the key is the column name and value is the percentage would be the easiest.
#### - Prinout the missing info

In [177]:
def get_missing_values(df : DataFrame) -> dict:
    missing_values = {}
    for col in df.columns:
        missing_values[col] = df.filter(fn.col(col).isNull()).count() / df.count()
    # sort the dictionary by values (missing values) in descending order
    missing_values = dict(sorted(missing_values.items(), key=lambda x: x[1], reverse=True))
    return missing_values

missing_values = get_missing_values(fintech_df)
print(missing_values)    

{'annual_inc_joint': 0.9298187199408066, 'emp_title': 0.08812430632630411, 'emp_length': 0.06977432482426933, 'int_rate': 0.04679985201627821, 'description': 0.009174990751017388, 'customer_id': 0.0, 'home_ownership': 0.0, 'annual_inc': 0.0, 'verification_status': 0.0, 'zip_code': 0.0, 'addr_state': 0.0, 'avg_cur_bal': 0.0, 'tot_cur_bal': 0.0, 'loan_id': 0.0, 'loan_status': 0.0, 'loan_amount': 0.0, 'state': 0.0, 'funded_amount': 0.0, 'term': 0.0, 'grade': 0.0, 'issue_date': 0.0, 'pymnt_plan': 0.0, 'type': 0.0, 'purpose': 0.0}


#### - Handle missing
- For numerical features replace with 0.
- For categorical/strings replace with mode

In [178]:
def get_imp_value(df : DataFrame, col : str, strategy : str = "mode") -> any:
    if strategy == "mean":
        imp_value = df.select(fn.mean(col)).collect()[0][0]
    else:
        # if strategy == "mode":
        imp_value = df.filter(fn.col(col).isNotNull()).groupBy(col).count()\
            .orderBy(fn.desc("count")).limit(1).select(col).collect()[0][0]
    return imp_value
    

In [179]:
from pyspark.sql.types import *
def handle_missing_values(df : DataFrame) -> DataFrame:
    missing_vals = dict(filter(lambda x: x[1] > 0, get_missing_values(df).items()))

    schema = df.schema
    for col in missing_vals.keys():
        if schema[col].dataType == IntegerType() or schema[col].dataType == DoubleType():
            df = df.fillna(subset=[col], value=0)
        else:
            # if schema[col].dataType == StringType() or schema[col].dataType == BooleanType():
            df = df.fillna(subset=[col], value=get_imp_value(df, col, "mode"))
    return df

fintech_df = handle_missing_values(fintech_df)

#### - Check missing
- Afterwards, check that there are no missing values

In [180]:
print("The number of missing values in the dataset after handling missing values: "+ \
    str(len(dict(filter(lambda x: x[1] > 0.0, get_missing_values(fintech_df).items())))))

The number of missing values in the dataset after handling missing values: 0


### Part 3: Encoding

Encode only the following categorical values
- Emp Length: Change to numerical
- Home Ownership: One Hot Encoding
- Verification Status: One Hot Encoding
- State: Label Encoding
- Type: One Hot Encoding
- Purpose: Label Encoding
- For the grade, only descretize it to be letter grade, not need to label encode it further

**DO NOT** Encode the employment title of description or any other column that is not mentioned above

First, define the functions:

In [181]:
def convert_emp_length_to_int(df : DataFrame) -> DataFrame:
    df = df.withColumn("emp_length", fn.regexp_replace("emp_length", "[^0-9]", ""))
    df = df.withColumn("emp_length", fn.col("emp_length").cast(IntegerType()))
    return df

In [182]:
def get_string_indexers(df : DataFrame, cols : list) -> list:
    from pyspark.ml.feature import StringIndexer
    return [StringIndexer(inputCol=col, outputCol=col+"_encoded").fit(df) for col in cols]


In [183]:
def label_encode(df : DataFrame, cols : list) -> tuple:
    indexers = get_string_indexers(df, cols)
    for indexer in indexers:
        df = indexer.transform(df)
    return df, indexers

In [184]:
def one_hot_encode(df : DataFrame, cols : list) -> DataFrame:
    from pyspark.ml.feature import OneHotEncoder, StringIndexerModel
    from pyspark.ml.functions import vector_to_array

    # first, clean the column values (replace spaces and dashes with underscores) and convert them to lowercase
    for col in cols:
        df = df.withColumn(col, fn.lower(fn.regexp_replace(col, " ", "_")))\
            .withColumn(col, fn.lower(fn.regexp_replace(col, "-", "_")))


    # index and label encode the columns (prerequisite for one-hot encoding) (https://www.skytowner.com/explore/one_hot_encoding_in_pyspark)
    df, index_fitters = label_encode(df, cols)
    index_fitters : StringIndexerModel = index_fitters

    # one-hot encode the columns to vector
    encoder = OneHotEncoder(dropLast=False, inputCols=[col + "_encoded" for col in cols], outputCols=[col + "_hencoded" for col in cols])
    df_encoded = encoder.fit(df).transform(df)
    df_encoded = df_encoded.drop(*[col + "_encoded" for col in cols])
    
    # convert the vector to array
    df_encoded = df_encoded.select("*", *[vector_to_array(col).alias(col+"_array") for col in [col + "_hencoded" for col in cols]])
    df_encoded = df_encoded.drop(*[col + "_hencoded" for col in cols])

    # expand the array to columns and rename the columns
    for col, indexer in zip(cols, index_fitters): 
        num_categories = len(df_encoded.first()[col + "_hencoded_array"]) 
        labels = indexer.labels 
        cols_expanded = [fn.col(col + "_hencoded_array")[i].alias(f'{col}_{str(labels[i]).lower().replace(" ", "_").replace("-", "_")}') for i in range(num_categories)] 
        df_encoded = df_encoded.select("*", *cols_expanded)

    df_encoded = df_encoded.drop(*[col + "_hencoded_array" for col in cols])

    # Convert the columns to binary (0 and 1) 
    for col, indexer in zip(cols, index_fitters):
        labels = indexer.labels 
        for i in range(len(labels)):
            df_encoded = df_encoded.withColumn(f'{col}_{str(labels[i]).lower().replace(" ", "_").replace("-", "_")}', fn.col(f'{col}_{str(labels[i]).lower().replace(" ", "_").replace("-", "_")}').cast('boolean'))

    return df_encoded   

In [185]:
def discretize_column(df : DataFrame, col : str, bins_limits :list = None, labels:list = None) -> DataFrame:
    if bins_limits is None and col == "grade":
        bins_limits = [5, 10, 15, 20, 25, 30]
        labels = ["A", "B", "C", "D", "E", "F", "G"]
    elif labels is None:
        labels = [f"{col}_{i}" for i in range(len(bins_limits) - 1)]
    
    df = df.withColumn(col+"_discretized", fn.when(fn.col(col) <= bins_limits[0], labels[0]).otherwise(fn.col(col)))
    for i in range(1, len(bins_limits)):
        df = df.withColumn(col+"_discretized", fn.when(fn.col(col+"_discretized") <= bins_limits[i], labels[i]).otherwise(fn.col(col+"_discretized")))
    df = df.withColumn(col+"_discretized", fn.when(fn.col(col+"_discretized") > bins_limits[-1], labels[-1]).otherwise(fn.col(col+"_discretized")))

    return df
discretize_column(fintech_df, "grade").select("grade", "grade_discretized").distinct().show(8)

+-----+-----------------+
|grade|grade_discretized|
+-----+-----------------+
|   21|                E|
|   29|                F|
|    9|                B|
|   35|                G|
|    8|                B|
|   33|                G|
|   17|                D|
|   10|                B|
+-----+-----------------+
only showing top 8 rows



Second, apply the functions:

In [186]:
# - Emp Length: Change to numerical
# - Home Ownership: One Hot Encoding
# - Verification Status: One Hot Encoding
# - State: Label Encoding
# - Type: One Hot Encoding
# - Purpose: Label Encoding
# - For the grade, only descretize it to be letter grade, not need to label encode it further

fintech_df = convert_emp_length_to_int(fintech_df)
fintech_df = one_hot_encode(fintech_df, ["home_ownership", "verification_status", "type"])
fintech_df = label_encode(fintech_df, ["state", "purpose"])[0]
fintech_df = discretize_column(fintech_df, "grade")

In [187]:
fintech_df.show(3)

+--------------------+---------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------------+----------+----------+------------------+--------------------+-----------------------+-------------------+------------------+------------------+--------------------+-----------------------------------+--------------------------------+----------------------------+---------------+--------------+----------+---------------+-------------+---------------+-----------------+
|         customer_id|      emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|      issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_mortgage|home_ownership_rent|home_ownership_own|hom

### Part 4: Feature Engineering

Write a function that adds the 3 following features. Try as much as you can to use built in fucntions in PySpark (from the functions library) check lab 8.
<br> Avoid writing UDFs from scratch.
- Previous loan issue date from the same grade
- Previoius Loan amount from the same grade
- Previous loan date from the same state and grade combined
- Previous loan amount from the same state and grade combined

First we need to convert the issue date to a date format:

In [188]:
def convert_issueD_to_date(df : DataFrame) -> DataFrame:
    df = df.withColumn("issue_date", fn.to_date(fn.to_timestamp(fn.col("issue_date"), "d MMMM yyyy")))
    return df

In [189]:

def add_new_features(df : DataFrame) -> DataFrame:
    #first convert the issue_date to date for proper date sorting
    df = convert_issueD_to_date(df)

    # Add prv_grade_loan_issue_date
    window = Window.partitionBy("grade").orderBy("issue_date")
    df = df.withColumn("prv_grade_loan_issue_date", fn.lag("issue_date").over(window))

    # Add prv_loan_amnt
    #TODO: check if this is correct
    df = df.withColumn("prv_loan_amount", fn.lag("loan_amount").over(window))

    # Add prv_state_grade_issue_date
    window = Window.partitionBy("state", "grade").orderBy("issue_date")
    df = df.withColumn("prv_state_grade_issue_date", fn.lag("issue_date").over(window))

    # Add prv_state_grade_loan_amount
    df = df.withColumn("prv_state_grade_loan_amount", fn.lag("loan_amount").over(window))

    return df

In [None]:
fintech_df = add_new_features(fintech_df)
fintech_df.filter(fn.col("grade") == 30)\
    .select("issue_date", "grade", "grade_discretized", "loan_amount", "prv_loan_amount", "prv_grade_loan_issue_date")\
        .orderBy("issue_date").show(10)
fintech_df.show(3)

+----------+-----+-----------------+-----------+---------------+-------------------------+
|issue_date|grade|grade_discretized|loan_amount|prv_loan_amount|prv_grade_loan_issue_date|
+----------+-----+-----------------+-----------+---------------+-------------------------+
|2013-01-13|   30|                F|    20000.0|           null|                     null|
|2013-02-13|   30|                F|     5000.0|        20000.0|               2013-01-13|
|2013-04-13|   30|                F|    35000.0|         5000.0|               2013-02-13|
|2013-05-13|   30|                F|    21600.0|        35000.0|               2013-04-13|
|2013-05-13|   30|                F|    28000.0|        21600.0|               2013-05-13|
|2013-06-13|   30|                F|    35000.0|        28000.0|               2013-05-13|
|2013-06-13|   30|                F|    22000.0|        35000.0|               2013-06-13|
|2013-08-13|   30|                F|    20000.0|        22000.0|               2013-06-13|

### Part 5: Analysis SQL VS Spark

Answer each of the following questions using both SQL and Spark:
1. Identify the average loan amount and interest rate for loans marked as "Default" in the Loan Status, grouped by Emp Length and annual income ranges.<br>
Hint: Use SQL Cases to bin Annual Income into Income Ranges
2. Calculate the average difference between Loan Amount and Funded Amount for each
loan Grade and sort by the grades with the largest differences.
3. Compare the total Loan Amount for loans with "Verified" and "Not Verified"
Verification Status across each state (Addr State).
4. Calculate the average time gap (in days) between consecutive loans for each
grade using the new features you added in the feature engineering phase.
5. Identify the average difference in loan amounts between consecutive loans
within the same state and grade combination.

### Part 6: Lookup Table & Saving the Dataset

#### Part 6.1: Lookup Table

- Create a lookup table for the encodings only

#### Part 6.2: Saving the Dataset

- Finally load (save) the cleaned PySpark df and the lookup table to parquet
files

### Part 7: Bonus - Loading to Postgres

- Load the cleaned parquet file and lookup table into a Postgres database.
- Take Screenshots showing the newly added features in the feature engineering section
- Take a screenshot from the lookup table

## Deliverables
1. Python Notebook with the following naming m3_spark_<id>.ipynb eg.
m3_spark_52_XXXX.ipynb
2. Cleaned Parquet file named: fintech_spark_52_XXXX_clean.parquet
3. Lookup table named: lookup_spark_52_XXXX.parquet
4. Incase of doing the bonus: Screenshots from PGAdmin showing the cleaned table
(some of the rows) and another one showing the lookup table.
Note: All these files should reside in a folder for milestone 3, inside the root drive
folder created previously in milestone 1.

### Submission guidelines
Upload all the deliverables in your google drive milestone folder.
Best of luck.

In [191]:
# Closing Spark Session Context
# sc.stop()