# Milestone 3 - Spark

### Importing libraries and creation of SparkSession

In [1]:
from pyspark.sql import SparkSession, Row,Window,functions as fn
import os
import psutil
import shutil
from pyspark.sql.functions import to_date
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.ml.functions import vector_to_array



In [2]:
spark = SparkSession.builder.appName("MS3_Spark_52_24625").config("spark.jars", "/opt/spark/jars/postgresql-42.7.4.jar") \
    .getOrCreate()
sc = spark.sparkContext

### Part 1: Loading the dataset

### 1. Load the dataset 

In [3]:
fintech_df = spark.read.parquet('/data/fintech_data_2_52_24625.parquet')

### 2. Preview first 20 rows

In [4]:
fintech_df.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|      Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------------+----------+----------+------------------+--------------------+
|YidceGQxXHg4ZlwnX...|    Registered Nurse|

In [5]:
fintech_df.printSchema()

root
 |-- Customer Id: string (nullable = true)
 |-- Emp Title: string (nullable = true)
 |-- Emp Length: string (nullable = true)
 |-- Home Ownership: string (nullable = true)
 |-- Annual Inc: double (nullable = true)
 |-- Annual Inc Joint: double (nullable = true)
 |-- Verification Status: string (nullable = true)
 |-- Zip Code: string (nullable = true)
 |-- Addr State: string (nullable = true)
 |-- Avg Cur Bal: double (nullable = true)
 |-- Tot Cur Bal: double (nullable = true)
 |-- Loan Id: long (nullable = true)
 |-- Loan Status: string (nullable = true)
 |-- Loan Amount: double (nullable = true)
 |-- State: string (nullable = true)
 |-- Funded Amount: double (nullable = true)
 |-- Term: string (nullable = true)
 |-- Int Rate: double (nullable = true)
 |-- Grade: long (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Pymnt Plan: boolean (nullable = true)
 |-- Type: string (nullable = true)
 |-- Purpose: string (nullable = true)
 |-- Description: string (nullable = t

In [6]:
fintech_df.show(vertical=True, n=5)

-RECORD 0-----------------------------------
 Customer Id         | YidceGQxXHg4ZlwnX... 
 Emp Title           | Registered Nurse     
 Emp Length          | 8 years              
 Home Ownership      | RENT                 
 Annual Inc          | 85000.0              
 Annual Inc Joint    | NULL                 
 Verification Status | Not Verified         
 Zip Code            | 891xx                
 Addr State          | NV                   
 Avg Cur Bal         | 4336.0               
 Tot Cur Bal         | 60710.0              
 Loan Id             | 193327               
 Loan Status         | Fully Paid           
 Loan Amount         | 20000.0              
 State               | NV                   
 Funded Amount       | 20000.0              
 Term                |  36 months           
 Int Rate            | 0.1249               
 Grade               | 7                    
 Issue Date          | 14 March 2014        
 Pymnt Plan          | false                
 Type     

3. How many partitions is this dataframe split into?

In [7]:
print(f"Number of partitions: {fintech_df.rdd.getNumPartitions()}")


Number of partitions: 1


4. Change partitions to be equal to the number of your logical cores

In [8]:
num_cores = os.cpu_count()
print(f"Number of CPU cores: {num_cores}")

Number of CPU cores: 12


In [9]:
logical_cores = psutil.cpu_count(logical=True) 
physical_cores = psutil.cpu_count(logical=False)

print(f"Logical cores: {logical_cores}")
print(f"Physical cores: {physical_cores}")

Logical cores: 12
Physical cores: 6


In [10]:
fintech_df = fintech_df.repartition(12)

In [11]:
print(f"New number of partitions: {fintech_df.rdd.getNumPartitions()}")


New number of partitions: 12


### Part 2: Cleaning

1. Rename all columns (replacing a space with an underscore, and making it
lowercase)

In [12]:
renamed_columns = [column.replace(" ","_").lower() for column in fintech_df.columns]
fintech_df = fintech_df.toDF(*renamed_columns)

In [13]:
def clean_and_replace_inconsistent_values_type_column(df):

    df = df.withColumn('type',fn.when(fn.col('type').isNotNull(), fn.trim(fn.lower(fn.regexp_replace(fn.col('type'),' ', '_')))).otherwise(fn.col('type')))
    replacements = {'joint_app': 'joint'}
    for original, new_value in replacements.items():
        df = df.withColumn( 'type', fn.when(fn.col('type') == original, new_value).otherwise(fn.col('type'))  )
    
    return df

In [14]:
fintech_df = clean_and_replace_inconsistent_values_type_column(fintech_df)

In [15]:
fintech_df.show(5)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+---------------+----------+----------+------------------+------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|     issue_date|pymnt_plan|      type|           purpose|       description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+---------------+----------+----------+------------------+------------------+
|YidceGMwXHhlOFx4Y...|                NULL|      NUL

In [16]:
print(fintech_df.columns)

['customer_id', 'emp_title', 'emp_length', 'home_ownership', 'annual_inc', 'annual_inc_joint', 'verification_status', 'zip_code', 'addr_state', 'avg_cur_bal', 'tot_cur_bal', 'loan_id', 'loan_status', 'loan_amount', 'state', 'funded_amount', 'term', 'int_rate', 'grade', 'issue_date', 'pymnt_plan', 'type', 'purpose', 'description']


 2. Detect missing

 Displaying missing percentages using DF :

In [17]:
def calculate_missing_percentage_df(df):
    total_rows = df.count()
    missing_info = [Row(column_name=column,missing_percentage=(df.filter(df[column].isNull()).count()/total_rows)*100)for column in df.columns]
    missing_info_df = spark.createDataFrame(missing_info)
    missing_info_df = missing_info_df.withColumn("missing_percentage",fn.concat(fn.format_number("missing_percentage",6),fn.lit(" %")))
    return missing_info_df

In [18]:
missing_info_df = calculate_missing_percentage_df(fintech_df)


In [19]:
missing_info_df.show(24)

+-------------------+------------------+
|        column_name|missing_percentage|
+-------------------+------------------+
|        customer_id|        0.000000 %|
|          emp_title|        8.590455 %|
|         emp_length|        6.799852 %|
|     home_ownership|        0.000000 %|
|         annual_inc|        0.000000 %|
|   annual_inc_joint|       92.944876 %|
|verification_status|        0.000000 %|
|           zip_code|        0.000000 %|
|         addr_state|        0.000000 %|
|        avg_cur_bal|        0.000000 %|
|        tot_cur_bal|        0.000000 %|
|            loan_id|        0.000000 %|
|        loan_status|        0.000000 %|
|        loan_amount|        0.000000 %|
|              state|        0.000000 %|
|      funded_amount|        0.000000 %|
|               term|        0.000000 %|
|           int_rate|        4.402516 %|
|              grade|        0.000000 %|
|         issue_date|        0.000000 %|
|         pymnt_plan|        0.000000 %|
|               

Displaying missing percentage using dict :

In [20]:
def calculate_missing_percentage_dict (df):
    total_rows = df.count()
    missing_info = {}
    for column in df.columns :
        missing_count = df.filter(df[column].isNull()).count()
        missing_percentage = (missing_count/total_rows)*100
        missing_info[column] = missing_percentage
    return missing_info


In [21]:
missing_info_dict = calculate_missing_percentage_dict(fintech_df)

In [22]:
print("Missing info Dictionary :")
for column,percentage in missing_info_dict.items():
        print(f"Column: {column},Missing Percentage:{percentage:.6f}%")

Missing info Dictionary :
Column: customer_id,Missing Percentage:0.000000%
Column: emp_title,Missing Percentage:8.590455%
Column: emp_length,Missing Percentage:6.799852%
Column: home_ownership,Missing Percentage:0.000000%
Column: annual_inc,Missing Percentage:0.000000%
Column: annual_inc_joint,Missing Percentage:92.944876%
Column: verification_status,Missing Percentage:0.000000%
Column: zip_code,Missing Percentage:0.000000%
Column: addr_state,Missing Percentage:0.000000%
Column: avg_cur_bal,Missing Percentage:0.000000%
Column: tot_cur_bal,Missing Percentage:0.000000%
Column: loan_id,Missing Percentage:0.000000%
Column: loan_status,Missing Percentage:0.000000%
Column: loan_amount,Missing Percentage:0.000000%
Column: state,Missing Percentage:0.000000%
Column: funded_amount,Missing Percentage:0.000000%
Column: term,Missing Percentage:0.000000%
Column: int_rate,Missing Percentage:4.402516%
Column: grade,Missing Percentage:0.000000%
Column: issue_date,Missing Percentage:0.000000%
Column: py

 3. Handle missing values

3.1 For numerical features `annual_inc_joint ` & `int_rate`replace with 0

In [23]:
def impute_numerical_with_zero(df, columns):
    fill_values = {column: 0 for column in columns}
    df=df.fillna(fill_values)
    return df

In [24]:
impute_numerical_features_cols = ["annual_inc_joint", "int_rate"]
fintech_df = impute_numerical_with_zero(fintech_df,impute_numerical_features_cols)

In [25]:
fintech_df.filter(fintech_df.int_rate.isNull()).count()

0

In [26]:
fintech_df.filter(fintech_df.annual_inc_joint.isNull()).count()

0

In [27]:
fintech_df.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|      issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------------+----------+----------+------------------+--------------------+
|YidceGMwXHhlOFx4Y...|                NULL|

 Checking on imputation for `annual_inc_joint` & `int_rate`

In [28]:
missing_info_df = calculate_missing_percentage_df(fintech_df)
missing_info_df.show()

+-------------------+------------------+
|        column_name|missing_percentage|
+-------------------+------------------+
|        customer_id|        0.000000 %|
|          emp_title|        8.590455 %|
|         emp_length|        6.799852 %|
|     home_ownership|        0.000000 %|
|         annual_inc|        0.000000 %|
|   annual_inc_joint|        0.000000 %|
|verification_status|        0.000000 %|
|           zip_code|        0.000000 %|
|         addr_state|        0.000000 %|
|        avg_cur_bal|        0.000000 %|
|        tot_cur_bal|        0.000000 %|
|            loan_id|        0.000000 %|
|        loan_status|        0.000000 %|
|        loan_amount|        0.000000 %|
|              state|        0.000000 %|
|      funded_amount|        0.000000 %|
|               term|        0.000000 %|
|           int_rate|        0.000000 %|
|              grade|        0.000000 %|
|         issue_date|        0.000000 %|
+-------------------+------------------+
only showing top

3.2 For categorical/strings `emp_title`, `emp_length` & `description `replace with mode

In [29]:
def impute_categorical_with_mode(df, column):
    mode_df = df.filter(fn.col(column).isNotNull())\
                    .groupBy(fn.col(column)).count()\
                    .orderBy(fn.col('count').desc())\
                    .limit(1)
    mode_value =mode_df.select(column).collect()[0][0]
    df = df.fillna(value= mode_value,subset=[column])
    return df

In [30]:
impute_categorical_columns = ["emp_title","emp_length","description"]


In [31]:
for col in impute_categorical_columns :
    fintech_df = impute_categorical_with_mode(fintech_df,col)

 Checking on  missing after imputing with mode 

In [32]:
fintech_df.filter(fintech_df.emp_length.isNull()).count()

0

In [33]:
fintech_df.filter(fintech_df.emp_title.isNull()).count()

0

In [34]:
fintech_df.filter(fintech_df.description.isNull()).count()

0

In [35]:
missing_info_df = calculate_missing_percentage_df(fintech_df)

 4. Check on missing values afterwards in the whole Dataframe

In [36]:
missing_info_df.show()

+-------------------+------------------+
|        column_name|missing_percentage|
+-------------------+------------------+
|        customer_id|        0.000000 %|
|          emp_title|        0.000000 %|
|         emp_length|        0.000000 %|
|     home_ownership|        0.000000 %|
|         annual_inc|        0.000000 %|
|   annual_inc_joint|        0.000000 %|
|verification_status|        0.000000 %|
|           zip_code|        0.000000 %|
|         addr_state|        0.000000 %|
|        avg_cur_bal|        0.000000 %|
|        tot_cur_bal|        0.000000 %|
|            loan_id|        0.000000 %|
|        loan_status|        0.000000 %|
|        loan_amount|        0.000000 %|
|              state|        0.000000 %|
|      funded_amount|        0.000000 %|
|               term|        0.000000 %|
|           int_rate|        0.000000 %|
|              grade|        0.000000 %|
|         issue_date|        0.000000 %|
+-------------------+------------------+
only showing top

### Part 3: Encoding

In [37]:
lookup_table_schema = StructType([StructField("feature_name", StringType(), True),
                                  StructField("original_value", StringType(), True),
                                    StructField("encoded_value", StringType(), True)])

lookup_table_df = spark.createDataFrame([], schema=lookup_table_schema)

In [38]:
def update_lookup_table(lookup_df, feature_name, mapping):
    new_mappings = spark.createDataFrame(
        [(feature_name, orig_val, enc_val) for orig_val, enc_val in mapping.items()],
        schema=["feature_name", "original_value", "encoded_value"]
    )
    return lookup_df.union(new_mappings)

1. Emp Length: Change to numerical

In [39]:
def convert_emp_length_numeric(df,column_name,lookup_table_df):
    emp_length_mapping = {
        "< 1 year": 0.5,
        "1 year": 1.0,
        "2 years": 2.0,
        "3 years": 3.0,
        "4 years": 4.0,
        "5 years": 5.0,
        "6 years": 6.0,
        "7 years": 7.0,
        "8 years": 8.0,
        "9 years": 9.0,
        "10+ years": 11.0,
    }
    lookup_table_df = update_lookup_table( lookup_table_df,column_name,emp_length_mapping)
    df = df.withColumn("emp_length_numeric",fn.when(fn.col(column_name)== "< 1 year", 0.5).when(fn.col(column_name)=="1 year", 1.0)
        .when(fn.col(column_name)== "2 years",2.0).when(fn.col(column_name)=="3 years", 3.0)
        .when(fn.col(column_name)== "4 years",4.0).when(fn.col(column_name)== "5 years", 5.0)
        .when(fn.col(column_name)== "6 years",6.0).when(fn.col(column_name)=="7 years", 7.0)
        .when(fn.col(column_name)== "8 years",8.0).when(fn.col(column_name)== "9 years", 9.0)
        .when(fn.col(column_name)== "10+ years",11.0)
    )

    return df,lookup_table_df

In [40]:
fintech_df,lookup_table_df= convert_emp_length_numeric(fintech_df,"emp_length",lookup_table_df)

2. One-Hot Encoding for `home_ownership`, `verification_status`, `type`

In [41]:
one_hot_encoding_cols = ["home_ownership", "verification_status", "type"]

In [42]:
fintech_df.select("type").distinct().show()

+----------+
|      type|
+----------+
|     joint|
|direct_pay|
|individual|
+----------+



In [43]:
fintech_df.select("verification_status").distinct().show()

+-------------------+
|verification_status|
+-------------------+
|           Verified|
|    Source Verified|
|       Not Verified|
+-------------------+



In [44]:
fintech_df.select("home_ownership").distinct().show()


+--------------+
|home_ownership|
+--------------+
|           OWN|
|          RENT|
|      MORTGAGE|
|           ANY|
|         OTHER|
+--------------+



In [45]:
def apply_one_hot_encoding(df,one_hot_encoding_cols,lookup_table):
    for column_name in one_hot_encoding_cols:
        indexer = StringIndexer(inputCol=column_name, outputCol=f"{column_name}_index")
        indexer_model = indexer.fit(df)
        df = indexer_model.transform(df)
        index_to_value_mapping = {int(i):value for i, value in enumerate(indexer_model.labels)}
        for value in index_to_value_mapping.values():
            lookup_table = lookup_table.union(
                spark.createDataFrame( [(column_name, value, f"{column_name}_{value}(1_hot_enc_col)")],schema=["feature_name", "original_value", "encoded_value"]))
        encoder = OneHotEncoder(inputCol=f"{column_name}_index", outputCol=f"{column_name}_onehot", dropLast=False)
        encoder_model = encoder.fit(df)
        df = encoder_model.transform(df)
        df = df.withColumn(f"{column_name}_onehot_array", vector_to_array(fn.col(f"{column_name}_onehot")))
        for idx, value in index_to_value_mapping.items():
            binary_col_name = f"{column_name}_{value}"
            df = df.withColumn(binary_col_name, fn.col(f"{column_name}_onehot_array")[fn.lit(idx)])
        df = df.drop(f"{column_name}_index", f"{column_name}_onehot", f"{column_name}_onehot_array")
    return df,lookup_table

In [46]:
fintech_df ,lookup_table_df= apply_one_hot_encoding(fintech_df,one_hot_encoding_cols,lookup_table_df)

In [47]:
fintech_df.show(1)

+--------------------+---------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+---------------+----------+-----+------------------+------------------+------------------+-----------------------+-------------------+------------------+------------------+--------------------+-----------------------------------+--------------------------------+----------------------------+---------------+----------+---------------+
|         customer_id|emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|     issue_date|pymnt_plan| type|           purpose|       description|emp_length_numeric|home_ownership_MORTGAGE|home_ownership_RENT|home_ownership_OWN|home_ownership_ANY|home_ownership_OTHER|verification_sta

3. Label Encoding for `state` and `purpose`

In [48]:
label_encoding_cols = ["state", "purpose"]

In [49]:

def apply_label_encoding(df, label_encoding_cols, lookup_table_df):
    for column_name in label_encoding_cols:
        indexer = StringIndexer(inputCol=column_name, outputCol=f"{column_name}_label_encoding", handleInvalid="skip")
        indexer_model = indexer.fit(df)
        df = indexer_model.transform(df)
        index_to_value_mapping = {label: index for index, label in enumerate(indexer_model.labels)}
        lookup_table_df = update_lookup_table(lookup_table_df, column_name, index_to_value_mapping)
    
    return df, lookup_table_df



In [50]:
fintech_df ,lookup_table_df = apply_label_encoding(fintech_df,label_encoding_cols,lookup_table_df)

4. Discretize `grade` to `letter_grade`

In [51]:
def apply_letter_grade_mapping(df, lookup_df):
    grade_mapping = {
         "1-5":"A",
        "6-10":"B",
        "11-15":"C",
       "16-20": "D",
       "21-25": "E",
        "26-30":"F",
        "31-35":"G"
    }
    df = df.withColumn( "letter_grade", fn.when((fn.col("grade") >= 1) &(fn.col("grade")<= 5),"A")
        .when((fn.col("grade") >= 6) &(fn.col("grade")<= 10), "B").when((fn.col("grade") >= 11) &(fn.col("grade")<= 15), "C")
        .when((fn.col("grade") >= 16) &(fn.col("grade")<= 20), "D").when((fn.col("grade") >= 21) &(fn.col("grade")<= 25), "E")
        .when((fn.col("grade") >= 26) &(fn.col("grade")<= 30), "F").when((fn.col("grade") >= 31) &(fn.col("grade")<= 35),"G")
    )
    lookup_df = update_lookup_table(lookup_df, "grade", grade_mapping)
    return df, lookup_df


In [52]:
fintech_df, lookup_table_df = apply_letter_grade_mapping(fintech_df, lookup_table_df)

In [53]:
lookup_table_df.filter(fn.col('feature_name') == 'purpose').show(truncate=False)


+------------+------------------+-------------+
|feature_name|original_value    |encoded_value|
+------------+------------------+-------------+
|purpose     |debt_consolidation|0            |
|purpose     |credit_card       |1            |
|purpose     |home_improvement  |2            |
|purpose     |other             |3            |
|purpose     |major_purchase    |4            |
|purpose     |medical           |5            |
|purpose     |car               |6            |
|purpose     |small_business    |7            |
|purpose     |vacation          |8            |
|purpose     |house             |9            |
|purpose     |moving            |10           |
|purpose     |renewable_energy  |11           |
|purpose     |wedding           |12           |
+------------+------------------+-------------+



In [54]:
fintech_df.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------------+----------+----------+------------------+--------------------+------------------+-----------------------+-------------------+------------------+------------------+--------------------+-----------------------------------+--------------------------------+----------------------------+---------------+----------+---------------+--------------------+----------------------+------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|      issue_date|pymnt_plan|      type|           purpose|         description|emp_length_numeric|home_ownership_MORTGAGE|

### Part 4: Feature Engineering

When i have discovered the dataset schema , I found that the `issue_date` feature is of type `string` not `date` , so in order to use `Window` prorperly , I need to convert it to `date` type using `to_date` from `pyspark`

In [55]:
fintech_df = fintech_df.withColumn("issue_date", to_date(fn.col("issue_date"), "d MMMM yyyy"))

In [56]:
fintech_df.select('issue_date').show()

+----------+
|issue_date|
+----------+
|2013-08-13|
|2013-08-13|
|2013-08-13|
|2013-08-13|
|2013-08-13|
|2013-08-13|
|2013-08-13|
|2013-08-13|
|2013-08-13|
|2013-08-13|
|2013-08-13|
|2013-08-13|
|2016-12-16|
|2016-12-16|
|2016-12-16|
|2016-12-16|
|2016-12-16|
|2016-12-16|
|2016-12-16|
|2016-12-16|
+----------+
only showing top 20 rows



In [57]:
def add_previous_loan_features(df):
    grade_window = Window.partitionBy('grade').orderBy('issue_date')
    grade_state_combined_window = Window.partitionBy(['grade','state']).orderBy('issue_date')
#     df = df.withColumn("prev_issue_date_from_same_grade", fn.lag("issue_date",1).over(grade_window)).orderBy("issue_date")
#     df = df.withColumn( "prev_loan_amount_from_same_grade", fn.lag("loan_amount",1).over(grade_window)).orderBy("loan_amount")
#     df = df.withColumn("pre_loan_date_from_same_state_grade", fn.lag("issue_date",1).over(grade_state_combined_window)).orderBy("issue_date")
#     df = df.withColumn("prev_loan_amount_from_same_state_grade",fn.lag("loan_amount",1).over(grade_state_combined_window)).orderBy("loan_amount")
    df= df.withColumn("prev_issue_date_from_same_grade", fn.lag("issue_date",1).over(grade_window))\
           .withColumn( "prev_loan_amount_from_same_grade", fn.lag("loan_amount",1).over(grade_window))\
           .withColumn("prev_loan_date_from_same_state_grade", fn.lag("issue_date",1).over(grade_state_combined_window))\
           .withColumn("prev_loan_amount_from_same_state_grade",fn.lag("loan_amount",1).over(grade_state_combined_window))
    return df

In [58]:
fintech_df = add_previous_loan_features(fintech_df)


In [59]:
fintech_df.select('grade', 'issue_date','prev_issue_date_from_same_grade').orderBy('grade').show()

+-----+----------+-------------------------------+
|grade|issue_date|prev_issue_date_from_same_grade|
+-----+----------+-------------------------------+
|    1|2012-09-12|                           NULL|
|    1|2012-09-12|                     2012-09-12|
|    1|2012-10-12|                     2012-09-12|
|    1|2012-10-12|                     2012-10-12|
|    1|2012-10-12|                     2012-10-12|
|    1|2012-10-12|                     2012-10-12|
|    1|2012-11-12|                     2012-10-12|
|    1|2012-12-12|                     2012-11-12|
|    1|2012-12-12|                     2012-12-12|
|    1|2012-12-12|                     2012-12-12|
|    1|2013-01-13|                     2012-12-12|
|    1|2013-01-13|                     2013-01-13|
|    1|2013-02-13|                     2013-01-13|
|    1|2013-02-13|                     2013-02-13|
|    1|2013-03-13|                     2013-02-13|
|    1|2013-03-13|                     2013-03-13|
|    1|2013-04-13|             

In [60]:
fintech_df.select('grade', 'loan_amount','prev_loan_amount_from_same_grade').orderBy('grade').show()

+-----+-----------+--------------------------------+
|grade|loan_amount|prev_loan_amount_from_same_grade|
+-----+-----------+--------------------------------+
|    1|    12800.0|                            NULL|
|    1|     3000.0|                         12800.0|
|    1|    12000.0|                          3000.0|
|    1|     8000.0|                         12000.0|
|    1|     8000.0|                          8000.0|
|    1|    16550.0|                          8000.0|
|    1|     8900.0|                         16550.0|
|    1|     6500.0|                          8900.0|
|    1|     2600.0|                          6500.0|
|    1|    22000.0|                          2600.0|
|    1|    15000.0|                         22000.0|
|    1|    15000.0|                         15000.0|
|    1|    28000.0|                         15000.0|
|    1|    20000.0|                         28000.0|
|    1|    20050.0|                         20000.0|
|    1|     6500.0|                         20

In [61]:
fintech_df.select('state','grade', 'issue_date','prev_loan_date_from_same_state_grade').orderBy(['grade','state','issue_date']).show()

+-----+-----+----------+------------------------------------+
|state|grade|issue_date|prev_loan_date_from_same_state_grade|
+-----+-----+----------+------------------------------------+
|   AK|    1|2013-12-13|                                NULL|
|   AK|    1|2019-12-19|                          2013-12-13|
|   AL|    1|2015-05-15|                                NULL|
|   AL|    1|2015-07-15|                          2015-05-15|
|   AL|    1|2015-09-15|                          2015-07-15|
|   AL|    1|2015-10-15|                          2015-09-15|
|   AL|    1|2016-07-16|                          2015-10-15|
|   AL|    1|2018-03-18|                          2016-07-16|
|   AL|    1|2019-06-19|                          2018-03-18|
|   AL|    1|2019-10-19|                          2019-06-19|
|   AR|    1|2014-05-14|                                NULL|
|   AR|    1|2014-10-14|                          2014-05-14|
|   AR|    1|2014-12-14|                          2014-10-14|
|   AR| 

In [62]:
fintech_df.select('state','grade', 'loan_amount','prev_loan_amount_from_same_state_grade').orderBy(['grade','state','issue_date']).show()

+-----+-----+-----------+--------------------------------------+
|state|grade|loan_amount|prev_loan_amount_from_same_state_grade|
+-----+-----+-----------+--------------------------------------+
|   AK|    1|    20000.0|                                  NULL|
|   AK|    1|    24000.0|                               20000.0|
|   AL|    1|    15000.0|                                  NULL|
|   AL|    1|     8000.0|                               15000.0|
|   AL|    1|    18000.0|                                8000.0|
|   AL|    1|    25000.0|                               18000.0|
|   AL|    1|     4000.0|                               25000.0|
|   AL|    1|    10000.0|                                4000.0|
|   AL|    1|     7000.0|                               10000.0|
|   AL|    1|    10000.0|                                7000.0|
|   AR|    1|    20000.0|                                  NULL|
|   AR|    1|    12500.0|                               20000.0|
|   AR|    1|     9900.0|

In [63]:
fintech_df.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+--------------------+------------------+-----------------------+-------------------+------------------+------------------+--------------------+-----------------------------------+--------------------------------+----------------------------+---------------+----------+---------------+--------------------+----------------------+------------+-------------------------------+--------------------------------+------------------------------------+--------------------------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|in

## Part 5: Analysis SQL vs Spark

1. 1 First Query : using SQL and Spark 


 I extracted relevant info in order to use it inside the first query 


In [64]:
q1,q2,q3, q4,q5 = fintech_df.approxQuantile("annual_inc", [0.10,0.25,0.50,0.75,0.90], 0.0)  

iqr = q3 - q1

print(f'First Quartile {q1}')
print(f'Second Quartile {q2}')
print(f'Third Quartile {q3}')
print(f'Fourth Quartile {q4}')
print(f'Fifth Quartile {q5}')

First Quartile 35000.0
Second Quartile 47000.0
Third Quartile 66000.0
Fourth Quartile 95000.0
Fifth Quartile 130000.0


In [65]:
fintech_df.createOrReplaceTempView("fintect_df_sql_table")

In the first query i ordered by `emp_length_numeric` and `emp_length` once  to have a better visualization of the query to differentiate between different income ranges for each grade 

In [66]:

first_query_Defualt = """SELECT emp_length, CASE 
            WHEN annual_inc < 47000 THEN 'Low_Annual_Inc' 
            WHEN annual_inc BETWEEN 47000 AND 95000 THEN 'Average_Annual_Inc' 
            ELSE 'High_Annual_Inc'END AS income_range,
             AVG(loan_amount) AS avg_loan_amount, AVG(int_rate) AS avg_int_rate
            FROM fintect_df_sql_table 
            WHERE loan_status = 'Default'
            GROUP BY CASE
                WHEN annual_inc < 47000 THEN 'Low_Annual_Inc'
                WHEN annual_inc BETWEEN 47000 AND 95000 THEN 'Average_Annual_Inc'
                ELSE 'High_Annual_Inc'
                END,emp_length
            Order BY emp_length
        """



In [67]:

first_query_sql_res = spark.sql(first_query_Defualt)
first_query_sql_res.show()

+----------+---------------+---------------+------------+
|emp_length|   income_range|avg_loan_amount|avg_int_rate|
+----------+---------------+---------------+------------+
|   3 years|High_Annual_Inc|        35000.0|      0.1344|
+----------+---------------+---------------+------------+



For the first query , i wanted to test on another value for `loan_status` becuase the dataset already has one record that contain value `Default` in it 

In [68]:

first_query_Current_emp_length_numeric = """SELECT emp_length_numeric, CASE 
            WHEN annual_inc < 47000 THEN 'Low_Annual_Inc' 
            WHEN annual_inc BETWEEN 47000 AND 95000 THEN 'Average_Annual_Inc' 
            ELSE 'High_Annual_Inc'END AS income_range,
             AVG(loan_amount) AS avg_loan_amount, AVG(int_rate) AS avg_int_rate
            FROM fintect_df_sql_table 
            WHERE loan_status = 'Current'
            GROUP BY CASE
                WHEN annual_inc < 47000 THEN 'Low_Annual_Inc'
                WHEN annual_inc BETWEEN 47000 AND 95000 THEN 'Average_Annual_Inc'
                ELSE 'High_Annual_Inc'
                END,emp_length_numeric
            Order BY emp_length_numeric
        """


In [69]:

first_query_Current_emp_length = """SELECT emp_length, 
            CASE 
            WHEN annual_inc < 47000 THEN 'Low_Annual_Inc' 
            WHEN annual_inc BETWEEN 47000 AND 95000 THEN 'Average_Annual_Inc' 
            ELSE 'High_Annual_Inc'END AS income_range,
             AVG(loan_amount) AS avg_loan_amount, AVG(int_rate) AS avg_int_rate
            FROM fintect_df_sql_table WHERE loan_status = 'Current'
            GROUP BY CASE
                WHEN annual_inc < 47000 THEN 'Low_Annual_Inc'
                WHEN annual_inc BETWEEN 47000 AND 95000 THEN 'Average_Annual_Inc'
                ELSE 'High_Annual_Inc'
                END,emp_length
            Order BY emp_length
        """

In [70]:

first_query_sql_res = spark.sql(first_query_Current_emp_length_numeric)
first_query_sql_res.show()

+------------------+------------------+------------------+-------------------+
|emp_length_numeric|      income_range|   avg_loan_amount|       avg_int_rate|
+------------------+------------------+------------------+-------------------+
|               0.5|    Low_Annual_Inc| 10944.29906542056| 0.1284026168224299|
|               0.5|Average_Annual_Inc|15517.972027972028|0.11866503496503494|
|               0.5|   High_Annual_Inc|23262.123115577888|0.11073944723618089|
|               1.0|   High_Annual_Inc|      22917.578125|     0.115623046875|
|               1.0|Average_Annual_Inc|           14640.0|0.12170728971962619|
|               1.0|    Low_Annual_Inc| 9597.426470588236|0.12983764705882353|
|               2.0|   High_Annual_Inc|22366.641791044774|0.11751223880597013|
|               2.0|Average_Annual_Inc|14830.043859649122|0.11936077694235592|
|               2.0|    Low_Annual_Inc|  9821.22641509434| 0.1350551886792453|
|               3.0|   High_Annual_Inc|21755.4200542

In [71]:

first_query_sql_res = spark.sql(first_query_Current_emp_length)
first_query_sql_res.show()

+----------+------------------+------------------+-------------------+
|emp_length|      income_range|   avg_loan_amount|       avg_int_rate|
+----------+------------------+------------------+-------------------+
|    1 year|   High_Annual_Inc|      22917.578125|     0.115623046875|
|    1 year|    Low_Annual_Inc| 9597.426470588236|0.12983764705882353|
|    1 year|Average_Annual_Inc|           14640.0|0.12170728971962619|
| 10+ years|    Low_Annual_Inc| 10570.43663821405|0.13323624425476036|
| 10+ years|   High_Annual_Inc|21961.920714684533|0.11426616415410387|
| 10+ years|Average_Annual_Inc|15481.786117381489|0.12211326185101579|
|   2 years|   High_Annual_Inc|22366.641791044774|0.11751223880597013|
|   2 years|    Low_Annual_Inc|  9821.22641509434| 0.1350551886792453|
|   2 years|Average_Annual_Inc|14830.043859649122|0.11936077694235592|
|   3 years|    Low_Annual_Inc| 10104.77207977208| 0.1309230769230769|
|   3 years|Average_Annual_Inc|15249.461538461539| 0.1271246153846154|
|   3 

1. 1 First Query : using Spark Functions

For `loan_status` = `Default`

In [72]:
fintect_df_income_ranges = fintech_df.withColumn(
    "income_range", fn.when(fn.col("annual_inc")<47000 , "Low_Annual_Inc")\
    .when((fn.col("annual_inc")>= 47000)&(fn.col("annual_inc")<=95000),"Average_Annual_Inc").otherwise("High_Annual_Inc")
)

Default_filtered_df = fintect_df_income_ranges.filter(fn.col("loan_status")=="Default")
first_query_spark = (Default_filtered_df.groupBy("emp_length","income_range")\
                     .agg(fn.avg("loan_amount").alias("avg_loan_amount"),fn.avg("int_rate").alias("avg_int_rate"))\
                .orderBy("emp_length") )

first_query_spark.show()

+----------+---------------+---------------+------------+
|emp_length|   income_range|avg_loan_amount|avg_int_rate|
+----------+---------------+---------------+------------+
|   3 years|High_Annual_Inc|        35000.0|      0.1344|
+----------+---------------+---------------+------------+



For `loan_status` = `Current`

In [73]:
fintect_df_income_ranges = fintech_df.withColumn(
    "income_range", fn.when(fn.col("annual_inc")<47000 , "Low_Annual_Inc")\
    .when((fn.col("annual_inc")>= 47000)&(fn.col("annual_inc")<=95000),"Average_Annual_Inc").otherwise("High_Annual_Inc")
)

Default_filtered_df = fintect_df_income_ranges.filter(fn.col("loan_status")=="Current")
first_query_spark = (Default_filtered_df.groupBy("emp_length","income_range")\
                     .agg(fn.avg("loan_amount").alias("avg_loan_amount"),fn.avg("int_rate").alias("avg_int_rate"))\
                .orderBy("emp_length") )

first_query_spark.show()

+----------+------------------+------------------+-------------------+
|emp_length|      income_range|   avg_loan_amount|       avg_int_rate|
+----------+------------------+------------------+-------------------+
|    1 year|   High_Annual_Inc|      22917.578125|     0.115623046875|
|    1 year|    Low_Annual_Inc| 9597.426470588236|0.12983764705882353|
|    1 year|Average_Annual_Inc|           14640.0|0.12170728971962619|
| 10+ years|   High_Annual_Inc|21961.920714684533|0.11426616415410387|
| 10+ years|Average_Annual_Inc|15481.786117381489|0.12211326185101579|
| 10+ years|    Low_Annual_Inc| 10570.43663821405|0.13323624425476036|
|   2 years|Average_Annual_Inc|14830.043859649122|0.11936077694235592|
|   2 years|   High_Annual_Inc|22366.641791044774|0.11751223880597013|
|   2 years|    Low_Annual_Inc|  9821.22641509434| 0.1350551886792453|
|   3 years|Average_Annual_Inc|15249.461538461539| 0.1271246153846154|
|   3 years|    Low_Annual_Inc| 10104.77207977208| 0.1309230769230769|
|   3 

In [74]:
fintech_df_first_query = fintech_df.withColumn(
    "income_range", fn.when(fn.col("annual_inc")<47000 , "Low_Annual_Inc")\
    .when((fn.col("annual_inc")>= 47000)&(fn.col("annual_inc")<=95000),"Average_Annual_Inc").otherwise("High_Annual_Inc")
)

Default_filtered_df = fintech_df_first_query.filter(fn.col("loan_status")=="Current")
first_query_spark_df = (Default_filtered_df.groupBy("emp_length_numeric","income_range")\
                     .agg(fn.avg("loan_amount").alias("avg_loan_amount"),fn.avg("int_rate").alias("avg_int_rate"))\
                .orderBy("emp_length_numeric") )

first_query_spark_df.show()

+------------------+------------------+------------------+-------------------+
|emp_length_numeric|      income_range|   avg_loan_amount|       avg_int_rate|
+------------------+------------------+------------------+-------------------+
|               0.5|    Low_Annual_Inc| 10944.29906542056| 0.1284026168224299|
|               0.5|Average_Annual_Inc|15517.972027972028|0.11866503496503494|
|               0.5|   High_Annual_Inc|23262.123115577888|0.11073944723618089|
|               1.0|   High_Annual_Inc|      22917.578125|     0.115623046875|
|               1.0|Average_Annual_Inc|           14640.0|0.12170728971962619|
|               1.0|    Low_Annual_Inc| 9597.426470588236|0.12983764705882353|
|               2.0|Average_Annual_Inc|14830.043859649122|0.11936077694235592|
|               2.0|   High_Annual_Inc|22366.641791044774|0.11751223880597013|
|               2.0|    Low_Annual_Inc|  9821.22641509434| 0.1350551886792453|
|               3.0|   High_Annual_Inc|21755.4200542

2. 1 Second Query : using SQL and Spark 

In [75]:
second_query = """ SELECT grade AS loan_grade, AVG(loan_amount- funded_amount ) AS average_difference
                   FROM fintect_df_sql_table 
                   GROUP BY grade 
                   ORDER BY average_difference DESC """

In [76]:
second_query_sql_res = spark.sql(second_query)
second_query_sql_res.show(35)

+----------+------------------+
|loan_grade|average_difference|
+----------+------------------+
|        29|               0.0|
|        26|               0.0|
|        19|               0.0|
|        22|               0.0|
|         7|               0.0|
|        34|               0.0|
|        32|               0.0|
|        31|               0.0|
|        25|               0.0|
|         6|               0.0|
|         9|               0.0|
|        27|               0.0|
|        17|               0.0|
|        28|               0.0|
|        33|               0.0|
|         5|               0.0|
|         1|               0.0|
|        10|               0.0|
|         3|               0.0|
|        12|               0.0|
|         8|               0.0|
|        11|               0.0|
|        35|               0.0|
|         2|               0.0|
|         4|               0.0|
|        13|               0.0|
|        18|               0.0|
|        14|               0.0|
|       

2. 2 Second Query : using Spark Functions

In [77]:
second_query_spark_df = (fintech_df.groupBy("grade")\
                      .agg(fn.avg(fn.col("loan_amount")-fn.col("funded_amount")).alias("average_difference"))\
                        .orderBy(fn.col("average_difference").desc()))

second_query_spark_df.show()

+-----+------------------+
|grade|average_difference|
+-----+------------------+
|   29|               0.0|
|   26|               0.0|
|   19|               0.0|
|   22|               0.0|
|    7|               0.0|
|   34|               0.0|
|   32|               0.0|
|   31|               0.0|
|   25|               0.0|
|    6|               0.0|
|    9|               0.0|
|   27|               0.0|
|   17|               0.0|
|   28|               0.0|
|   33|               0.0|
|    5|               0.0|
|    1|               0.0|
|   10|               0.0|
|    3|               0.0|
|   12|               0.0|
+-----+------------------+
only showing top 20 rows



3. 1 Third Query : using SQL and Spark

Here , I have ordered the query by `addr_state` in order to give better visualization for total loan amount for loans with `Verified` and `Not Verified` `verification_status` accros each `addr_state`

In [78]:
third_query = """ SELECT  addr_state,verification_status,SUM(loan_amount) AS total_loan_amount
                  FROM fintect_df_sql_table 
                  WHERE verification_status = 'Verified' OR verification_status = 'Not Verified'
                  GROUP BY addr_state,verification_status
                  ORDER BY  addr_state    """  
 

In [79]:
third_query_sql_res = spark.sql(third_query)
third_query_sql_res.show(50)

+----------+-------------------+-----------------+
|addr_state|verification_status|total_loan_amount|
+----------+-------------------+-----------------+
|        AK|       Not Verified|         269450.0|
|        AK|           Verified|         333925.0|
|        AL|       Not Verified|        1241900.0|
|        AL|           Verified|        1340700.0|
|        AR|       Not Verified|         717025.0|
|        AR|           Verified|         747625.0|
|        AZ|       Not Verified|        2890450.0|
|        AZ|           Verified|        2731625.0|
|        CA|       Not Verified|      1.8032175E7|
|        CA|           Verified|      1.5609625E7|
|        CO|       Not Verified|        2740950.0|
|        CO|           Verified|        2434350.0|
|        CT|       Not Verified|        1915300.0|
|        CT|           Verified|        1704100.0|
|        DC|           Verified|         452000.0|
|        DC|       Not Verified|         228000.0|
|        DE|       Not Verified

3. 2 Third Query : using Spark Functions

In [80]:
third_query_filtered_df = fintech_df.filter((fn.col("verification_status") =="Verified")|(fn.col("verification_status")== "Not Verified"))
third_query_spark_df= (third_query_filtered_df.groupBy("addr_state","verification_status")\
                       .agg(fn.sum("loan_amount").alias("total_loan_amount"))\
                        .orderBy("addr_state"))
third_query_spark_df.show()

+----------+-------------------+-----------------+
|addr_state|verification_status|total_loan_amount|
+----------+-------------------+-----------------+
|        AK|       Not Verified|         269450.0|
|        AK|           Verified|         333925.0|
|        AL|       Not Verified|        1241900.0|
|        AL|           Verified|        1340700.0|
|        AR|       Not Verified|         717025.0|
|        AR|           Verified|         747625.0|
|        AZ|       Not Verified|        2890450.0|
|        AZ|           Verified|        2731625.0|
|        CA|       Not Verified|      1.8032175E7|
|        CA|           Verified|      1.5609625E7|
|        CO|       Not Verified|        2740950.0|
|        CO|           Verified|        2434350.0|
|        CT|       Not Verified|        1915300.0|
|        CT|           Verified|        1704100.0|
|        DC|           Verified|         452000.0|
|        DC|       Not Verified|         228000.0|
|        DE|       Not Verified

4. 1 Fourth Query : using SQL and Spark

Here also i ordered by `avg_time_gap_days` in order to give better visualization for the query 

The Spark SQL datediff() function is used to get the date difference between two dates in terms of DAYS

In [81]:
fourth_query = """SELECT grade,AVG(DATEDIFF(issue_date, prev_issue_date_from_same_grade)) AS avg_time_gap_days
                  FROM fintect_df_sql_table
                  WHERE prev_issue_date_from_same_grade IS NOT NULL
                  GROUP BY grade
                  ORDER BY  avg_time_gap_days  DESC
                """

In [82]:
fourth_query_sql_res = spark.sql(fourth_query)
fourth_query_sql_res.show(50)

+-----+------------------+
|grade| avg_time_gap_days|
+-----+------------------+
|   32| 83.22727272727273|
|   33| 80.64285714285714|
|   34|              76.3|
|   31|             75.68|
|   35| 62.03333333333333|
|   30|30.095890410958905|
|   27|28.785714285714285|
|   28| 26.46987951807229|
|   26| 25.72289156626506|
|   29|25.363636363636363|
|   25| 9.488188976377952|
|   22| 9.366533864541832|
|   23| 8.838345864661655|
|   24| 7.877483443708609|
|   21|              7.75|
|   17|3.6932599724896837|
|   20|3.5515873015873014|
|   16|3.4921052631578946|
|   19|3.4512353706111836|
|   18|3.3467843631778056|
|    2|2.3824057450628366|
|    4| 2.363556338028169|
|    3|2.3118466898954706|
|    1|2.2645051194539247|
|    5|2.2548853016142734|
|   14|1.8290833907649897|
|   11|1.7560497056899935|
|    7|1.7333763718528084|
|   15| 1.731245923026745|
|   13|1.7311411992263057|
|    6| 1.724470134874759|
|    9|1.6961465571699306|
|   12|1.6950904392764858|
|    8|1.6462293071735132|
|

4. 2 Fourth Query : using Spark Functions

In [83]:
fourth_query_filtered_df = fintech_df.filter(fn.col("prev_issue_date_from_same_grade").isNotNull())
fourth_query_spark_df =(fourth_query_filtered_df.groupBy("grade").agg(fn.avg(fn.datediff("issue_date","prev_issue_date_from_same_grade")).alias("avg_time_gap_days"))\
                        .orderBy(fn.col("avg_time_gap_days").desc()))

fourth_query_spark_df.show()

+-----+------------------+
|grade| avg_time_gap_days|
+-----+------------------+
|   32| 83.22727272727273|
|   33| 80.64285714285714|
|   34|              76.3|
|   31|             75.68|
|   35| 62.03333333333333|
|   30|30.095890410958905|
|   27|28.785714285714285|
|   28| 26.46987951807229|
|   26| 25.72289156626506|
|   29|25.363636363636363|
|   25| 9.488188976377952|
|   22| 9.366533864541832|
|   23| 8.838345864661655|
|   24| 7.877483443708609|
|   21|              7.75|
|   17|3.6932599724896837|
|   20|3.5515873015873014|
|   16|3.4921052631578946|
|   19|3.4512353706111836|
|   18|3.3467843631778056|
+-----+------------------+
only showing top 20 rows



5. 1 Fifth Query :using SQL and Spark

Here also i ordered by `avg_diff_loan_amount` in order to give better visualization for the query 

In [84]:
fifth_query = """SELECT grade,state,AVG(ABS(loan_amount- prev_loan_amount_from_same_state_grade)) AS avg_diff_loan_amount
                FROM fintect_df_sql_table
                WHERE prev_loan_amount_from_same_state_grade IS NOT NULL
                GROUP BY grade ,state
                ORDER BY avg_diff_loan_amount DESC  
           """

In [85]:
fifth_query_sql_res = spark.sql(fifth_query)
fifth_query_sql_res.show()

+-----+-----+--------------------+
|grade|state|avg_diff_loan_amount|
+-----+-----+--------------------+
|    8|   ID|             34800.0|
|   29|   AL|             31800.0|
|    4|   WY|             30050.0|
|    5|   SD|             29000.0|
|   34|   FL|             28400.0|
|   17|   VT|             27375.0|
|   24|   RI|             27100.0|
|   29|   ND|             26225.0|
|   21|   MA|             25925.0|
|   22|   CO|             25800.0|
|   19|   WY|             25400.0|
|   21|   AR|             25325.0|
|   27|   MN|             24175.0|
|   33|   MD|             24075.0|
|    9|   AK|             24062.5|
|   30|   NM|             23800.0|
|    1|   WY|             23600.0|
|   16|   VT|             23500.0|
|    1|   ID|             23000.0|
|   23|   NV|             23000.0|
+-----+-----+--------------------+
only showing top 20 rows



5. 2 Fifth Query :using Spark Functions

In [86]:
fifth_query_filtered_df = fintech_df.filter(fn.col("prev_loan_amount_from_same_state_grade").isNotNull())
fifth_query_spark_df =(fifth_query_filtered_df.groupBy("grade","state").agg(fn.avg(fn.abs(fn.col("loan_amount")-fn.col("prev_loan_amount_from_same_state_grade"))).alias("avg_diff_loan_amount"))\
                        .orderBy(fn.col("avg_diff_loan_amount").desc()))
fifth_query_spark_df.show()

+-----+-----+--------------------+
|grade|state|avg_diff_loan_amount|
+-----+-----+--------------------+
|    8|   ID|             34800.0|
|   29|   AL|             31800.0|
|    4|   WY|             30050.0|
|    5|   SD|             29000.0|
|   34|   FL|             28400.0|
|   17|   VT|             27375.0|
|   24|   RI|             27100.0|
|   29|   ND|             26225.0|
|   21|   MA|             25925.0|
|   22|   CO|             25800.0|
|   19|   WY|             25400.0|
|   21|   AR|             25325.0|
|   27|   MN|             24175.0|
|   33|   MD|             24075.0|
|    9|   AK|             24062.5|
|   30|   NM|             23800.0|
|    1|   WY|             23600.0|
|   16|   VT|             23500.0|
|    1|   ID|             23000.0|
|   23|   NV|             23000.0|
+-----+-----+--------------------+
only showing top 20 rows



### Dropping encoded columns before saving Dataframes into Paqruets

In [87]:
fintech_df = fintech_df.drop(fn.col("grade"),fn.col("emp_length"),fn.col("home_ownership"),fn.col("type"),fn.col("verification_status"),fn.col("purpose"),fn.col("state"))

In [88]:
fintech_df.columns

['customer_id',
 'emp_title',
 'annual_inc',
 'annual_inc_joint',
 'zip_code',
 'addr_state',
 'avg_cur_bal',
 'tot_cur_bal',
 'loan_id',
 'loan_status',
 'loan_amount',
 'funded_amount',
 'term',
 'int_rate',
 'issue_date',
 'pymnt_plan',
 'description',
 'emp_length_numeric',
 'home_ownership_MORTGAGE',
 'home_ownership_RENT',
 'home_ownership_OWN',
 'home_ownership_ANY',
 'home_ownership_OTHER',
 'verification_status_Source Verified',
 'verification_status_Not Verified',
 'verification_status_Verified',
 'type_individual',
 'type_joint',
 'type_direct_pay',
 'state_label_encoding',
 'purpose_label_encoding',
 'letter_grade',
 'prev_issue_date_from_same_grade',
 'prev_loan_amount_from_same_grade',
 'prev_loan_date_from_same_state_grade',
 'prev_loan_amount_from_same_state_grade']

In [89]:
fintech_df.show()

+--------------------+--------------------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-------------+----------+--------+----------+----------+--------------------+------------------+-----------------------+-------------------+------------------+------------------+--------------------+-----------------------------------+--------------------------------+----------------------------+---------------+----------+---------------+--------------------+----------------------+------------+-------------------------------+--------------------------------+------------------------------------+--------------------------------------+
|         customer_id|           emp_title|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|funded_amount|      term|int_rate|issue_date|pymnt_plan|         description|emp_length_numeric|home_ownership_MORTGAGE|home_ownership_RENT|home_ownership_OWN|home_own

## Part 6: Lookup Table & Saving the dataset

In [90]:
fintech_df.coalesce(1).write.parquet("/data/temp_fintech_spark_52_24625_clean", mode="overwrite")
parquet_file = [f for f in os.listdir("/data/temp_fintech_spark_52_24625_clean") if f.endswith(".parquet")][0]
shutil.move(f"/data/temp_fintech_spark_52_24625_clean/{parquet_file}", "/data/fintech_spark_52_24625_clean.parquet")
shutil.rmtree("/data/temp_fintech_spark_52_24625_clean")


lookup_table_df.coalesce(1).write.parquet("/data/temp_lookup_spark", mode="overwrite")
parquet_file = [f for f in os.listdir("/data/temp_lookup_spark") if f.endswith(".parquet")][0]
shutil.move(f"/data/temp_lookup_spark/{parquet_file}", "/data/lookup_spark_52_24625.parquet")
shutil.rmtree("/data/temp_lookup_spark") 

In [91]:
fintech_df_cleaned = spark.read.parquet('/data/fintech_spark_52_24625_clean.parquet')

In [92]:
fintech_df_cleaned.show()

+--------------------+--------------------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-------------+----------+--------+----------+----------+--------------------+------------------+-----------------------+-------------------+------------------+------------------+--------------------+-----------------------------------+--------------------------------+----------------------------+---------------+----------+---------------+--------------------+----------------------+------------+-------------------------------+--------------------------------+------------------------------------+--------------------------------------+
|         customer_id|           emp_title|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|funded_amount|      term|int_rate|issue_date|pymnt_plan|         description|emp_length_numeric|home_ownership_MORTGAGE|home_ownership_RENT|home_ownership_OWN|home_own

## BONUS: Loading to Postgres

In [93]:
jdbc_url = "jdbc:postgresql://pgdatabase:5432/MS3_MET_P1_52_24625"
db_properties = {
    "user": "root",
    "password": "root",
    "driver": "org.postgresql.Driver"
}



In [94]:
def save_parquet_to_db(df,df_name):
    df.write.format("jdbc").option("url", jdbc_url).option("dbtable", df_name) \
        .option("user", db_properties["user"]) \
        .option("password", db_properties["password"]) \
        .option("driver", db_properties["driver"]) \
        .mode("overwrite") \
        .save()
    print(f'{df_name} successfully saved to postgresDB !')

In [95]:
fintech_df_cleaned = spark.read.parquet('/data/fintech_spark_52_24625_clean.parquet')
lookup_table_saved = spark.read.parquet('/data/lookup_spark_52_24625.parquet')

In [96]:
save_parquet_to_db(fintech_df_cleaned,"fintech_spark_52_24625_clean")
save_parquet_to_db(lookup_table_saved,"lookup_spark_52_24625")

fintech_spark_52_24625_clean successfully saved to postgresDB !
lookup_spark_52_24625 successfully saved to postgresDB !
