In [2]:
!pip install pyspark
!pip install psutil



# Load Dataset
## Simply load the dataset from the parquet format given in the google drive above
- Load the dataset.
- Preview first 20 rows.
- How many partitions is this dataframe split into?
- Change partitions to be equal to the number of your logical cores

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Zeyad_Milestone3").getOrCreate()
sc = spark.sparkContext

In [4]:
file_path = "/content/fintech_data_49_52_16824.parquet" # Replace with your file path
df = spark.read.parquet(file_path)


In [5]:
df.show(20)

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|       Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceGM2XHhlOFx4M...|Director of Opera.

In [6]:
num_partitions = df.rdd.getNumPartitions()
print(f"Number of partitions: {num_partitions}")

Number of partitions: 1


In [7]:
import psutil
logical_cores = psutil.cpu_count(logical=True)
physical_cores = psutil.cpu_count(logical=False)

print(f"Number of logical cores: {logical_cores}")
print(f"Number of physical cores: {physical_cores}")

Number of logical cores: 2
Number of physical cores: 1


In [8]:
df = df.repartition(logical_cores)

num_partitions_new = df.rdd.getNumPartitions()
print(f"New number of partitions: {num_partitions_new}")

New number of partitions: 2


# Cleaning
- Rename all columns (replacing a space with an underscore, and making it
lowercase)
- Detect missing
  - Create a function that takes in the df and returns any data structrue of your choice(df/dict,list,tuple,etc) which has the name of the column and percentage of missing entries from the whole dataset.
  - Tip : storing the missing info as dict where the key is the column name and value is the percentage would be the easiest.
  - Prinout the missing info
- Handle missing
  - For numerical features replace with 0.
  - For categorical/strings replace with mode
- Check missing
  - Afterwards, check that there are no missing values

### Rename Columns

In [9]:
from pyspark.sql.functions import col

# Rename columns
new_columns = [col(c).alias(c.replace(" ", "_").lower()) for c in df.columns]
df = df.select(*new_columns)

In [10]:
df.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceGJlT1x4YTJce...|Interpreter Coord.

### Detect Missing

In [11]:
def detect_missing(df):
  """
  Detects missing values in a PySpark DataFrame and returns a dictionary
  containing column names and their corresponding missing value percentages.

  Args:
    df: The input PySpark DataFrame.

  Returns:
    A dictionary where keys are column names and values are the percentage of
    missing values in each column.
  """
  missing_info = {}
  total_rows = df.count()
  for col_name in df.columns:
    missing_count = df.filter(col(col_name).isNull()).count()
    missing_percentage = (missing_count / total_rows) * 100 if total_rows > 0 else 0
    missing_info[col_name] = missing_percentage
  return missing_info

In [12]:
missing_info = detect_missing(df)

In [13]:
missing_info

{'customer_id': 0.0,
 'emp_title': 8.749537550869405,
 'emp_length': 6.933037365889752,
 'home_ownership': 0.0,
 'annual_inc': 0.0,
 'annual_inc_joint': 92.90418054014057,
 'verification_status': 0.0,
 'zip_code': 0.0,
 'addr_state': 0.0,
 'avg_cur_bal': 0.0,
 'tot_cur_bal': 0.0,
 'loan_id': 0.0,
 'loan_status': 0.0,
 'loan_amount': 0.0,
 'state': 0.0,
 'funded_amount': 0.0,
 'term': 0.0,
 'int_rate': 4.594894561598224,
 'grade': 0.0,
 'issue_date': 0.0,
 'pymnt_plan': 0.0,
 'type': 0.0,
 'purpose': 0.0,
 'description': 0.8620051794302628}

### Handle Missing

In [14]:
from pyspark.sql.functions import col, when, count, lit

for col_name, missing_percentage in missing_info.items():
    if missing_percentage > 0:
      if df.schema[col_name].dataType.typeName() in ['int', 'double', 'float', 'long']:
          df = df.fillna(0, subset=[col_name])
      else: # Assuming other data types are categorical/strings
          mode_df = df.groupBy(col_name).count().orderBy(col("count").desc()).limit(2)
          mode_value = mode_df.collect()[0][col_name]
          mode_df.show()
          if mode_value is None: # if the first element in mode df is null check next element
            mode_value = mode_df.collect()[1][col_name]
          print(f"Mode value for column '{col_name}': {mode_value}\n")
          df = df.fillna(mode_value, subset=[col_name])

+---------+-----+
|emp_title|count|
+---------+-----+
|     NULL| 2365|
|  Teacher|  471|
+---------+-----+

Mode value for column 'emp_title': Teacher

+----------+-----+
|emp_length|count|
+----------+-----+
| 10+ years| 8837|
|  < 1 year| 2514|
+----------+-----+

Mode value for column 'emp_length': 10+ years

+--------------------+-----+
|         description|count|
+--------------------+-----+
|  Debt consolidation|14268|
|Credit card refin...| 6172|
+--------------------+-----+

Mode value for column 'description': Debt consolidation



### Check Missing

In [15]:
detect_missing(df)

{'customer_id': 0.0,
 'emp_title': 0.0,
 'emp_length': 0.0,
 'home_ownership': 0.0,
 'annual_inc': 0.0,
 'annual_inc_joint': 0.0,
 'verification_status': 0.0,
 'zip_code': 0.0,
 'addr_state': 0.0,
 'avg_cur_bal': 0.0,
 'tot_cur_bal': 0.0,
 'loan_id': 0.0,
 'loan_status': 0.0,
 'loan_amount': 0.0,
 'state': 0.0,
 'funded_amount': 0.0,
 'term': 0.0,
 'int_rate': 0.0,
 'grade': 0.0,
 'issue_date': 0.0,
 'pymnt_plan': 0.0,
 'type': 0.0,
 'purpose': 0.0,
 'description': 0.0}

In [16]:
df.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceGJlT1x4YTJce...|Interpreter Coord.

> no missing values left

# Encoding
## Encode only the following categorical values
- Emp Length: Change to numerical
- Home Ownership: One Hot Encoding
- Verification Status: One Hot Encoding
- State: Label Encoding
- Type: One Hot Encoding
- Purpose: Label Encoding
- For the **grade**, only descretize it to be letter grade, not need to label encode it further

### Creating lookup table

In [17]:
from pyspark.sql.types import StructType, StructField, StringType

columns = ["Column", "Original", "Encoded"]

schema = StructType([StructField(col_name, StringType(), True) for col_name in columns])

empty_rdd = spark.sparkContext.emptyRDD()
lookup_table = spark.createDataFrame(empty_rdd, schema = schema)
lookup_table.show()

+------+--------+-------+
|Column|Original|Encoded|
+------+--------+-------+
+------+--------+-------+



### Emp Length Encoding

In [18]:
from pyspark.sql.functions import split, regexp_extract, col, when
from pyspark.sql import Row

# Get distinct emp lengths for lookup table
distinct_emp_lengths = df.select("emp_length").distinct().rdd.flatMap(lambda x: x).collect()

# Replace values starting with '<' with '0'
encoded_df = df.withColumn("emp_length", when(col("emp_length").startswith("<"), "0").otherwise(col("emp_length")))

# Extract the first numerical part using regex and split
encoded_df = encoded_df.withColumn("emp_length", regexp_extract(col("emp_length"), "(\d+)", 1))
encoded_df = encoded_df.withColumn("emp_length", when(col("emp_length") != "", col("emp_length").cast("int")).otherwise(0))  # Replace empty strings with 0

encoded_df.show()

# Get encoded emp lengths for lookup table
encoded_emp_lengths = encoded_df.select("emp_length").distinct().rdd.flatMap(lambda x: x).collect()
for original in distinct_emp_lengths:
    new_row = spark.createDataFrame([Row(Column="emp_length", Original=str(original), Encoded= str(original))]).\
    withColumn("Encoded", when(col("Original").startswith("<"), "0").otherwise(col("Original"))).\
    withColumn("Encoded", regexp_extract(col("Encoded"), "(\d+)", 1)).\
    withColumn("Encoded", when(col("Encoded") != "", col("Encoded").cast("int")).otherwise(0))
    lookup_table = lookup_table.union(new_row)

lookup_table.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceGJlT1x4YTJce...|Interpreter Coord.

### Dealing with type column inconsistenceis

> Before one hot encoding convert all "joint app" to "JOINT" and "Individual" to "INDIVIDUAL" in type column

In [19]:
distinct_types = encoded_df.select("type").distinct().rdd.flatMap(lambda x: x).collect()
distinct_types

['Joint App', 'Individual', 'DIRECT_PAY', 'JOINT', 'INDIVIDUAL']

In [20]:
encoded_df = encoded_df.withColumn("type", when(df["type"] == "Individual", "INDIVIDUAL").when(df["type"] == "Joint App", "JOINT").otherwise(df["type"]))

In [21]:
distinct_types = encoded_df.select("type").distinct().rdd.flatMap(lambda x: x).collect()
distinct_types

['DIRECT_PAY', 'JOINT', 'INDIVIDUAL']

### One Hot Encoder

In [22]:
def one_hot_encode(df, column_name):
    """
    One-hot encodes a categorical column in a PySpark DataFrame.

    Args:
      df: The input PySpark DataFrame.
      column_name: The name of the categorical column to be one-hot encoded.

    Returns:
      A new PySpark DataFrame with the one-hot encoded column.
    """

    # 1. Get unique categories (casting to string to avoid nulls)
    categories = df.select(col(column_name).cast("string")).distinct().rdd.flatMap(lambda x: x).collect()
    categories = [x for x in categories if x is not None] # Remove any potential nulls

    # 2. Create an expression to generate one-hot encoded columns
    exprs = [when(col(column_name) == c, lit(1)).otherwise(lit(0)).alias(f"{column_name}_{str(c)}") for c in categories]

    # 3. Apply the expression and select the new columns
    encoded_df = df.select(df.columns + exprs)

    # 4. Drop the original column
    encoded_df = encoded_df.drop(column_name)

    return encoded_df


### Home Ownership

In [23]:
from pyspark.sql.functions import lit, explode, array, create_map, col


# Select the categorical column to one-hot encode
encoded_df = one_hot_encode(encoded_df, "home_ownership")

encoded_df.show()

+--------------------+--------------------+----------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|home_ownership_OTHER|
+--------------------+--------------------+----------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+

### Verification Status

In [24]:
encoded_df = one_hot_encode(encoded_df, "verification_status")

encoded_df.show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+--------------------+----------------------------+-----------------------------------+--------------------------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|home_ownership_OTHER|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|
+--------------------+------

### Label encoder

In [25]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql import Row

def label_encode(df, column_name, lookup_table):
    """
    Label encodes a categorical column in a PySpark DataFrame.

    Args:
      df: The input PySpark DataFrame.
      column_name: The name of the categorical column to be label encoded.

    Returns:
      A new PySpark DataFrame with the label encoded column.
    """
    # Create a StringIndexer object
    indexer = StringIndexer(inputCol=column_name, outputCol=f"{column_name}_encoded") # Temporary output column

    # Fit the indexer to the DataFrame
    indexer_model = indexer.fit(df)

    # Transform the DataFrame
    encoded_df = indexer_model.transform(df)

    # Get the mapping between original and encoded values
    mapping = indexer_model.labels

    # Add the mapping to the lookup table
    for encoded, original in enumerate(mapping):
        lookup_table = lookup_table.union(spark.createDataFrame([Row(Column=column_name, Original=str(original), Encoded=encoded)]))


    # Overwrite the original 'state' column with the encoded values
    encoded_df = encoded_df.drop(column_name).withColumnRenamed(f"{column_name}_encoded", column_name)
    return encoded_df, lookup_table


### State

In [26]:
encoded_df, lookup_table = label_encode(encoded_df, "state", lookup_table)

encoded_df.show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+--------------------+----------------------------+-----------------------------------+--------------------------------+-----+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|home_ownership_OTHER|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|state|
+--------------------+------

### Type

In [27]:
encoded_df = one_hot_encode(encoded_df, "type")

encoded_df.show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-------------+----------+--------+-----+-----------------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+--------------------+----------------------------+-----------------------------------+--------------------------------+-----+---------------+----------+---------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|           purpose|         description|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|home_ownership_OTHER|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|state|type_DIR

### Purpose

In [28]:
encoded_df, lookup_table = label_encode(encoded_df, "purpose", lookup_table)

encoded_df.show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-------------+----------+--------+-----+-----------------+----------+--------------------+------------------+-------------------+-----------------------+------------------+--------------------+----------------------------+-----------------------------------+--------------------------------+-----+---------------+----------+---------------+-------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|         description|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|home_ownership_OTHER|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|state|type_DIRECT_PAY|type_JOINT|type_INDIVI

### Grade

In [29]:
from pyspark.sql.functions import when

def assign_grade(grade_value):
    return when((grade_value >= 1) & (grade_value <= 5), lit('A')) \
           .when(grade_value <= 10, lit('B')) \
           .when(grade_value <= 15, lit('C')) \
           .when(grade_value <= 20, lit('D')) \
           .when(grade_value <= 25, lit('E')) \
           .when(grade_value <= 30, lit('F')) \
           .when(grade_value <= 35, lit('G')) \

encoded_df = encoded_df.withColumn("grade_letter", when(col("grade").isNotNull(), assign_grade(col("grade"))).otherwise("Unknown"))
encoded_df = encoded_df.drop("grade").withColumnRenamed("grade_letter", "grade")

# Add to lookup table
for i in range(1, 36):
    lookup_table = lookup_table.union(spark.createDataFrame([Row(Column="grade", Original=str(i))]).withColumn("Encoded", assign_grade(lit(i))))

encoded_df.show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-------------+----------+--------+-----------------+----------+--------------------+------------------+-------------------+-----------------------+------------------+--------------------+----------------------------+-----------------------------------+--------------------------------+-----+---------------+----------+---------------+-------+-----+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|funded_amount|      term|int_rate|       issue_date|pymnt_plan|         description|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|home_ownership_OTHER|verification_status_Verified|verification_status_Source Verified|verification_status_Not Verified|state|type_DIRECT_PAY|type_JOINT|type_INDIVIDUAL|p

# Feature Engineering
## Write a function that adds the 3 following features. Try as much as you can to use built in fucntions in PySpark (from the functions library) check lab 8, Avoid writing UDFs from scratch.
- Previous loan issue date from the same grade
- Previoius Loan amount from the same grade
- Previous loan date from the same state and grade combined
- Previous loan amount from the same state and grade combined

In [30]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, last, to_date, col

def add_features(df):
    df = df.withColumn("issue_date_formatted", to_date(col("issue_date"), "d MMMM yyyy")) # format date so that it compares correctly
    window_grade = Window.partitionBy("grade").orderBy("issue_date_formatted")
    window_state_grade = Window.partitionBy("state", "grade").orderBy("issue_date_formatted")

    df = df.withColumn("previous_loan_issue_date_same_grade", lag("issue_date_formatted", 1, None).over(window_grade))
    df = df.withColumn("previous_loan_amount_same_grade", lag("loan_amount", 1, None).over(window_grade))
    df = df.withColumn("previous_loan_issue_date_same_state_grade", lag("issue_date_formatted", 1, None).over(window_state_grade))
    df = df.withColumn("previous_loan_amount_same_state_grade", lag("loan_amount", 1, None).over(window_state_grade))
    return df

In [31]:
encoded_df = add_features(encoded_df)
encoded_df.show()

+--------------------+--------------------+----------+----------+----------------+--------+----------+-----------+-----------+-------+-----------+-----------+-------------+----------+--------+-----------------+----------+--------------------+------------------+-------------------+-----------------------+------------------+--------------------+----------------------------+-----------------------------------+--------------------------------+-----+---------------+----------+---------------+-------+-----+--------------------+-----------------------------------+-------------------------------+-----------------------------------------+-------------------------------------+
|         customer_id|           emp_title|emp_length|annual_inc|annual_inc_joint|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|funded_amount|      term|int_rate|       issue_date|pymnt_plan|         description|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|

In [32]:
encoded_df.select(col("customer_id"),col("issue_date"),"grade", col("previous_loan_issue_date_same_grade")).orderBy(col("previous_loan_issue_date_same_grade").asc()).show()

+--------------------+-----------------+-----+-----------------------------------+
|         customer_id|       issue_date|grade|previous_loan_issue_date_same_grade|
+--------------------+-----------------+-----+-----------------------------------+
|YidceGMwTFx4MDBce...|   12 August 2012|    A|                               NULL|
|YiJceGE3QFx4ZGFWL...|12 September 2012|    B|                               NULL|
|YicrXHg4MVx4YjRce...|   12 August 2012|    C|                               NULL|
|YidaXHhhZlx4ZjRce...|   12 August 2012|    D|                               NULL|
|YidceGFlKlx4YmNcb...|12 September 2012|    G|                               NULL|
|YidceDFiXVx4YzJlI...|   12 August 2012|    E|                               NULL|
|YicrXHhiNCkvXHhhZ...|  12 October 2012|    F|                               NULL|
|YidtXHgxOEZceGQ0X...|12 September 2012|    A|                         2012-08-12|
|YidceGE2Jlx4ZGJ8X...|   12 August 2012|    D|                         2012-08-12|
|Yid

# Analysis
## Answer each of the following questions using both SQL and Spark:
1. Identify the average loan amount and interest rate for loans marked as
"Default" in the Loan Status, grouped by Emp Length and annual income ranges.
Hint: Use SQL Cases to bin Annual Income into Income Ranges
2. Calculate the average difference between Loan Amount and Funded Amount for each
loan Grade and sort by the grades with the largest differences.
3. Compare the total Loan Amount for loans with "Verified" and "Not Verified"
Verification Status across each state (Addr State).
4. Calculate the average time gap (in days) between consecutive loans for each
grade using the new features you added in the feature engineering phase
5. Identify the average difference in loan amounts between consecutive loans
within the same state and grade combination.

### 1. Identify the average loan amount and interest rate for loans marked as "Default" in the Loan Status, grouped by Emp Length and annual income ranges.

#### Spark

In [33]:
# Get quantiles to be used to split up annual income into ranges
# Ranges are very low, low, medium, high, very high
annual_income_quantiles = encoded_df.approxQuantile("annual_inc", [0.2, 0.4, 0.6, 0.8], 0.01)
annual_income_quantiles

[42437.121, 60000.0, 75000.0, 100000.0]

In [34]:
df_with_income_ranges = encoded_df.withColumn("annual_income_range",\
                                              when(col("annual_inc") <= annual_income_quantiles[0], "very low")\
                                              .when((col("annual_inc") > annual_income_quantiles[0]) & (col("annual_inc") <= annual_income_quantiles[1]), "low")\
                                              .when((col("annual_inc") > annual_income_quantiles[1]) & (col("annual_inc") <= annual_income_quantiles[2]), "medium")\
                                              .when((col("annual_inc") > annual_income_quantiles[2]) & (col("annual_inc") <= annual_income_quantiles[3]), "high")\
                                              .otherwise("very high"))
df_with_income_ranges.select(col("customer_id"),col("annual_inc"),col("annual_income_range")).show()

+--------------------+----------+-------------------+
|         customer_id|annual_inc|annual_income_range|
+--------------------+----------+-------------------+
|YidceGYwXHhmMVpce...|   83000.0|               high|
|YiduXHhlNXRceDFhX...|   46000.0|                low|
|YidceGRlRFx4ZjZce...|   54000.0|                low|
|Yic1XHg5NVx4ZjJce...|   98000.0|               high|
|YidbWl86XHhiMlx4O...|   37000.0|           very low|
|YidOXHgxM1x4ZjMxN...|   26000.0|           very low|
|Yid7XHhjNFx4OGZce...|   27500.0|           very low|
|YidceDllXHgwMlx4Z...|   84000.0|               high|
|YidceDhmXHhmM1x4Z...|  120000.0|          very high|
|YidAXHhiNlx4YjNce...|   50000.0|                low|
|YidceGQ3XHJpdFx4M...|   59885.0|                low|
|YidceDAxS1x4OWFpL...|  130000.0|          very high|
|YidvUVtceDdmJFx4O...|   66000.0|             medium|
|YidhQVx4YjFTXHgwZ...|   41600.0|           very low|
|YidceDg4XHhhOVtce...|   30000.0|           very low|
|YidceGRkXHhjMFx4Z...|   500

In [35]:
from pyspark.sql.functions import avg
grouped_df = df_with_income_ranges.groupBy("emp_length", "annual_income_range", "loan_status").agg(avg("loan_amount").alias("avg_loan_amount"), avg("int_rate").alias("avg_int_rate"))

grouped_df.sort(col("loan_status").asc()).show()

+----------+-------------------+-----------+------------------+-------------------+
|emp_length|annual_income_range|loan_status|   avg_loan_amount|       avg_int_rate|
+----------+-------------------+-----------+------------------+-------------------+
|         3|           very low|Charged Off|10660.714285714286|0.14632571428571428|
|         7|                low|Charged Off|           13833.0|0.15851200000000001|
|         5|               high|Charged Off|19465.277777777777|0.13915555555555556|
|         0|             medium|Charged Off|           17669.0|0.15620799999999999|
|         8|               high|Charged Off| 18956.81818181818|0.17394545454545454|
|         6|           very low|Charged Off|           9268.75|0.15018749999999997|
|        10|             medium|Charged Off|16142.540322580646|0.14382096774193548|
|         2|          very high|Charged Off|21372.916666666668|0.11386666666666667|
|         3|                low|Charged Off| 14166.02564102564|0.15634102564

In [36]:
# get only the default loan status rows
default_df = grouped_df.filter(col("loan_status") == "Default")
default_df.show()

+----------+-------------------+-----------+---------------+------------+
|emp_length|annual_income_range|loan_status|avg_loan_amount|avg_int_rate|
+----------+-------------------+-----------+---------------+------------+
|         1|          very high|    Default|        35000.0|      0.2589|
|        10|           very low|    Default|        13925.0|      0.2215|
+----------+-------------------+-----------+---------------+------------+



#### SQL

In [37]:
encoded_df.createOrReplaceTempView("default_loans")
# Will use ranges previously obtained by quantiles from spark [42437.121, 60000.0, 75000.0, 100000.0]
query = """
SELECT
    emp_length,
    CASE
        WHEN annual_inc <= 42437.121 THEN 'very low'
        WHEN annual_inc > 42437.121 AND annual_inc <= 60000.0 THEN 'low'
        WHEN annual_inc > 60000.0 AND annual_inc <= 75000.0 THEN 'medium'
        WHEN annual_inc > 75000.0 AND annual_inc <= 100000.0 THEN 'high'
        ELSE 'very high'
    END AS annual_income_range,
    loan_status,
    AVG(loan_amount) AS avg_loan_amount,
    AVG(int_rate) AS avg_int_rate
FROM default_loans
GROUP BY emp_length, annual_income_range, loan_status
HAVING loan_status = "Default"
"""
result = spark.sql(query)
result.show()


+----------+-------------------+-----------+---------------+------------+
|emp_length|annual_income_range|loan_status|avg_loan_amount|avg_int_rate|
+----------+-------------------+-----------+---------------+------------+
|         1|          very high|    Default|        35000.0|      0.2589|
|        10|           very low|    Default|        13925.0|      0.2215|
+----------+-------------------+-----------+---------------+------------+



### 2. Calculate the average difference between Loan Amount and Funded Amount for each loan Grade and sort by the grades with the largest differences.

#### Spark

In [38]:
grade_grouped_df = encoded_df.groupBy("grade").agg(avg(col("loan_amount") - col("funded_amount")).alias("avg_diff_between_loan_and_funded_amounts"))
grade_grouped_df.sort(col("avg_diff_between_loan_and_funded_amounts").desc()).show()


+-----+----------------------------------------+
|grade|avg_diff_between_loan_and_funded_amounts|
+-----+----------------------------------------+
|    F|                                     0.0|
|    E|                                     0.0|
|    B|                                     0.0|
|    D|                                     0.0|
|    C|                                     0.0|
|    A|                                     0.0|
|    G|                                     0.0|
+-----+----------------------------------------+



>did this to double check that there is truly 0 rows with different loan and funded amounts

In [39]:
encoded_df.select(col("customer_id"),col("loan_amount"),col("funded_amount")).filter(col("loan_amount") != col("funded_amount")).show()

+-----------+-----------+-------------+
|customer_id|loan_amount|funded_amount|
+-----------+-----------+-------------+
+-----------+-----------+-------------+



#### SQL

In [40]:
encoded_df.createOrReplaceTempView("loan_funded_diff")
query = """
SELECT
    grade,
    AVG(loan_amount - funded_amount) AS avg_diff_between_loan_and_funded_amounts
FROM loan_funded_diff
GROUP BY grade
ORDER BY avg_diff_between_loan_and_funded_amounts DESC
"""
result = spark.sql(query)
result.show()

+-----+----------------------------------------+
|grade|avg_diff_between_loan_and_funded_amounts|
+-----+----------------------------------------+
|    F|                                     0.0|
|    E|                                     0.0|
|    B|                                     0.0|
|    D|                                     0.0|
|    C|                                     0.0|
|    A|                                     0.0|
|    G|                                     0.0|
+-----+----------------------------------------+



### 3. Compare the total Loan Amount for loans with "Verified" and "Not Verified" Verification Status across each state (Addr State).

#### Spark

In [41]:
from pyspark.sql.functions import sum, col, when

state_grouped_df = encoded_df.groupBy("addr_state", "verification_status_Verified", "verification_status_Not Verified").\
agg(sum("loan_amount").alias("total_loan_amount")).\
withColumn("verification_status", when(col("verification_status_Verified") == 1, "Verified").when(col("verification_status_Not Verified") == 1, "Not Verified").otherwise("other"))
state_grouped_df.filter(col("verification_status") != "other").orderBy(col("addr_state").asc(), col("verification_status").asc()).select(col("addr_state"), col("total_loan_amount"), "verification_status").show()

+----------+-----------------+-------------------+
|addr_state|total_loan_amount|verification_status|
+----------+-----------------+-------------------+
|        AK|         252200.0|       Not Verified|
|        AK|         243600.0|           Verified|
|        AL|        1420850.0|       Not Verified|
|        AL|        1350675.0|           Verified|
|        AR|         677275.0|       Not Verified|
|        AR|         656625.0|           Verified|
|        AZ|        3542075.0|       Not Verified|
|        AZ|        2911425.0|           Verified|
|        CA|       1.969535E7|       Not Verified|
|        CA|      1.4923975E7|           Verified|
|        CO|        2675500.0|       Not Verified|
|        CO|        2488950.0|           Verified|
|        CT|        2107950.0|       Not Verified|
|        CT|        1820925.0|           Verified|
|        DC|         164125.0|       Not Verified|
|        DC|         308575.0|           Verified|
|        DE|         474300.0| 

#### SQL

In [42]:
encoded_df.createOrReplaceTempView("total_loan_amounts")
query = """
SELECT
    addr_state,
    SUM(loan_amount) AS total_loan_amount,
    CASE
      WHEN verification_status_Verified = 1 THEN 'Verified'
      WHEN `verification_status_Not Verified` = 1 THEN 'Not Verified'
      ELSE 'other'
    END AS verification_status
FROM total_loan_amounts
GROUP BY addr_state, verification_status
HAVING verification_status != 'other'
ORDER BY addr_state ASC, verification_status ASC
"""
result = spark.sql(query)
result.show()

+----------+-----------------+-------------------+
|addr_state|total_loan_amount|verification_status|
+----------+-----------------+-------------------+
|        AK|         252200.0|       Not Verified|
|        AK|         243600.0|           Verified|
|        AL|        1420850.0|       Not Verified|
|        AL|        1350675.0|           Verified|
|        AR|         677275.0|       Not Verified|
|        AR|         656625.0|           Verified|
|        AZ|        3542075.0|       Not Verified|
|        AZ|        2911425.0|           Verified|
|        CA|       1.969535E7|       Not Verified|
|        CA|      1.4923975E7|           Verified|
|        CO|        2675500.0|       Not Verified|
|        CO|        2488950.0|           Verified|
|        CT|        2107950.0|       Not Verified|
|        CT|        1820925.0|           Verified|
|        DC|         164125.0|       Not Verified|
|        DC|         308575.0|           Verified|
|        DE|         474300.0| 

### 4. Calculate the average time gap (in days) between consecutive loans for each grade using the new features you added in the feature engineering phase


#### Spark

In [43]:
encoded_df.select(col("customer_id"),col("issue_date"),col("issue_date_formatted"), col("previous_loan_issue_date_same_grade"), col("loan_amount"), col("previous_loan_amount_same_grade"), col("grade")).orderBy(col("grade").desc()\
                                                                                                                                                                                                        ,col("issue_date_formatted").asc()\
                                                                                                                                                                                                        ).show()

+--------------------+-----------------+--------------------+-----------------------------------+-----------+-------------------------------+-----+
|         customer_id|       issue_date|issue_date_formatted|previous_loan_issue_date_same_grade|loan_amount|previous_loan_amount_same_grade|grade|
+--------------------+-----------------+--------------------+-----------------------------------+-----------+-------------------------------+-----+
|YidceGFlKlx4YmNcb...|12 September 2012|          2012-09-12|                               NULL|    24000.0|                           NULL|    G|
|YiJceGM5XHhhZlx4O...| 12 November 2012|          2012-11-12|                         2012-09-12|    26200.0|                        24000.0|    G|
|YidceDA1XHg4MFx4Z...|    13 April 2013|          2013-04-13|                         2012-11-12|    24000.0|                        26200.0|    G|
|YidceGQwTFx4Zjdce...|      13 May 2013|          2013-05-13|                         2013-04-13|    30000.0|   

In [44]:
from pyspark.sql.functions import datediff
avg_time_gap_df = encoded_df.groupBy("grade").agg(avg(datediff(col("issue_date_formatted"), col("previous_loan_issue_date_same_grade"))).alias("average_time_gap_in_days"))
avg_time_gap_df.show()

+-----+------------------------+
|grade|average_time_gap_in_days|
+-----+------------------------+
|    A|      0.4627714581178904|
|    B|      0.3388229286352611|
|    C|     0.35440865892291445|
|    D|      0.6888147768086198|
|    E|      1.6959887403237157|
|    F|      5.8172323759791125|
|    G|                    20.8|
+-----+------------------------+



#### SQL

In [45]:
encoded_df.createOrReplaceTempView("average_time_gap")
query = """
SELECT
    grade,
    AVG(DATEDIFF(issue_date_formatted, previous_loan_issue_date_same_grade)) AS average_time_gap_in_days
FROM average_time_gap
GROUP BY grade
"""
result = spark.sql(query)
result.show()

+-----+------------------------+
|grade|average_time_gap_in_days|
+-----+------------------------+
|    A|      0.4627714581178904|
|    B|      0.3388229286352611|
|    C|     0.35440865892291445|
|    D|      0.6888147768086198|
|    E|      1.6959887403237157|
|    F|      5.8172323759791125|
|    G|                    20.8|
+-----+------------------------+



### 5. Identify the average difference in loan amounts between consecutive loans within the same state and grade combination.

#### Spark

In [46]:
encoded_df.columns

['customer_id',
 'emp_title',
 'emp_length',
 'annual_inc',
 'annual_inc_joint',
 'zip_code',
 'addr_state',
 'avg_cur_bal',
 'tot_cur_bal',
 'loan_id',
 'loan_status',
 'loan_amount',
 'funded_amount',
 'term',
 'int_rate',
 'issue_date',
 'pymnt_plan',
 'description',
 'home_ownership_OWN',
 'home_ownership_RENT',
 'home_ownership_MORTGAGE',
 'home_ownership_ANY',
 'home_ownership_OTHER',
 'verification_status_Verified',
 'verification_status_Source Verified',
 'verification_status_Not Verified',
 'state',
 'type_DIRECT_PAY',
 'type_JOINT',
 'type_INDIVIDUAL',
 'purpose',
 'grade',
 'issue_date_formatted',
 'previous_loan_issue_date_same_grade',
 'previous_loan_amount_same_grade',
 'previous_loan_issue_date_same_state_grade',
 'previous_loan_amount_same_state_grade']

In [47]:
# adding 0 in place of nulls as null means there's <= 1 row of that combination
# which means theres 0 difference between consecutive loans
# as there are no consecutive loans
avg_loan_amount_diff_df = encoded_df.groupBy("state", "grade").agg(
      avg(
          col("loan_amount") - col("previous_loan_amount_same_state_grade")
      ).alias("avg_diff_between_consecutive_loans")
    ).fillna(0)
avg_loan_amount_diff_df.show()

+-----+-----+----------------------------------+
|state|grade|avg_diff_between_consecutive_loans|
+-----+-----+----------------------------------+
| 36.0|    D|                -333.3333333333333|
| 23.0|    A|                 25.31645569620253|
| 34.0|    C|                 56.86274509803921|
| 46.0|    C|                 858.9285714285714|
| 17.0|    C|                 42.58474576271186|
| 42.0|    D|                           565.625|
|  0.0|    F|               -431.37254901960785|
|  7.0|    E|                222.22222222222223|
| 13.0|    E|                -511.1111111111111|
| 15.0|    C|               -25.157232704402517|
| 26.0|    B|                207.41758241758242|
| 44.0|    B|                 109.0909090909091|
| 29.0|    G|                               0.0|
| 35.0|    E|                           -1400.0|
|  0.0|    C|                3.0706243602865917|
| 24.0|    C|                 2.247191011235955|
| 49.0|    D|                            -100.0|
|  5.0|    F|       

> attempting to show state code too

In [48]:
avg_loan_amount_diff_df.join(lookup_table, (avg_loan_amount_diff_df.state == lookup_table.Encoded) & (lookup_table.Column == 'state'), "left").select(col("state"), col("Original").alias("state_name"), col("grade"), col("avg_diff_between_consecutive_loans")).show()

+-----+----------+-----+----------------------------------+
|state|state_name|grade|avg_diff_between_consecutive_loans|
+-----+----------+-----+----------------------------------+
| 36.0|        NE|    D|                -333.3333333333333|
| 23.0|        OR|    A|                 25.31645569620253|
| 34.0|        MS|    C|                 56.86274509803921|
| 46.0|        WY|    C|                 858.9285714285714|
| 17.0|        MN|    C|                 42.58474576271186|
| 42.0|        ME|    D|                           565.625|
|  0.0|        CA|    F|               -431.37254901960785|
|  7.0|        OH|    E|                222.22222222222223|
| 13.0|        MD|    E|                -511.1111111111111|
| 15.0|        WA|    C|               -25.157232704402517|
| 26.0|        SC|    B|                207.41758241758242|
| 44.0|        DC|    B|                 109.0909090909091|
| 29.0|        KY|    G|                               0.0|
| 35.0|        NH|    E|                

#### SQL

In [49]:
encoded_df.createOrReplaceTempView("avg_loan_amount_diff")
query = """
SELECT
    state,
    grade,
    COALESCE(AVG(loan_amount - previous_loan_amount_same_state_grade),0) AS avg_diff_between_consecutive_loans
FROM avg_loan_amount_diff
GROUP BY state, grade
"""
result = spark.sql(query)
result.show()

+-----+-----+----------------------------------+
|state|grade|avg_diff_between_consecutive_loans|
+-----+-----+----------------------------------+
| 36.0|    D|                -333.3333333333333|
| 23.0|    A|                 25.31645569620253|
| 34.0|    C|                 56.86274509803921|
| 46.0|    C|                 858.9285714285714|
| 17.0|    C|                 42.58474576271186|
| 42.0|    D|                           565.625|
|  0.0|    F|               -431.37254901960785|
|  7.0|    E|                222.22222222222223|
| 13.0|    E|                -511.1111111111111|
| 15.0|    C|               -25.157232704402517|
| 26.0|    B|                207.41758241758242|
| 44.0|    B|                 109.0909090909091|
| 29.0|    G|                               0.0|
| 35.0|    E|                           -1400.0|
|  0.0|    C|                3.0706243602865917|
| 24.0|    C|                 2.247191011235955|
| 49.0|    D|                            -100.0|
|  5.0|    F|       

> again attempting to show state code

In [50]:
lookup_table.createOrReplaceTempView("lookup_table")
query = """
SELECT
    state,
    FIRST(lookup_table.Original) AS state_name,
    grade,
    COALESCE(AVG(loan_amount - previous_loan_amount_same_state_grade),0) AS avg
FROM avg_loan_amount_diff
LEFT JOIN lookup_table ON avg_loan_amount_diff.state = lookup_table.Encoded
WHERE lookup_table.Column = 'state'
GROUP BY state, grade
"""
result = spark.sql(query)
result.show()

+-----+----------+-----+-------------------+
|state|state_name|grade|                avg|
+-----+----------+-----+-------------------+
|  0.0|        CA|    A|0.23201856148491878|
|  0.0|        CA|    B|  9.945750452079565|
|  0.0|        CA|    C| 3.0706243602865917|
|  0.0|        CA|    D|  4.081632653061225|
|  0.0|        CA|    E| -3.609625668449198|
|  0.0|        CA|    F|-431.37254901960785|
|  0.0|        CA|    G|  846.1538461538462|
|  1.0|        NY|    A| -53.99568034557235|
|  1.0|        NY|    B|  29.12878787878788|
|  1.0|        NY|    C| 25.713213213213212|
|  1.0|        NY|    D|  74.02597402597402|
|  1.0|        NY|    E|-191.66666666666666|
|  1.0|        NY|    F|  689.6551724137931|
|  1.0|        NY|    G| -1111.111111111111|
|  2.0|        TX|    A|  75.81967213114754|
|  2.0|        TX|    B|  4.739336492890995|
|  2.0|        TX|    C|-3.4686971235194584|
|  2.0|        TX|    D|-14.802631578947368|
|  2.0|        TX|    E| -53.94736842105263|
|  2.0|   

# Lookup Table

In [51]:
lookup_table.show(lookup_table.count(), truncate=False)

+----------+------------------+-------+
|Column    |Original          |Encoded|
+----------+------------------+-------+
|emp_length|5 years           |5      |
|emp_length|9 years           |9      |
|emp_length|1 year            |1      |
|emp_length|2 years           |2      |
|emp_length|7 years           |7      |
|emp_length|8 years           |8      |
|emp_length|4 years           |4      |
|emp_length|6 years           |6      |
|emp_length|3 years           |3      |
|emp_length|10+ years         |10     |
|emp_length|< 1 year          |0      |
|state     |CA                |0      |
|state     |NY                |1      |
|state     |TX                |2      |
|state     |FL                |3      |
|state     |IL                |4      |
|state     |NJ                |5      |
|state     |PA                |6      |
|state     |OH                |7      |
|state     |GA                |8      |
|state     |NC                |9      |
|state     |AZ                |10     |


# Saving the dataset

In [52]:
encoded_df.columns

['customer_id',
 'emp_title',
 'emp_length',
 'annual_inc',
 'annual_inc_joint',
 'zip_code',
 'addr_state',
 'avg_cur_bal',
 'tot_cur_bal',
 'loan_id',
 'loan_status',
 'loan_amount',
 'funded_amount',
 'term',
 'int_rate',
 'issue_date',
 'pymnt_plan',
 'description',
 'home_ownership_OWN',
 'home_ownership_RENT',
 'home_ownership_MORTGAGE',
 'home_ownership_ANY',
 'home_ownership_OTHER',
 'verification_status_Verified',
 'verification_status_Source Verified',
 'verification_status_Not Verified',
 'state',
 'type_DIRECT_PAY',
 'type_JOINT',
 'type_INDIVIDUAL',
 'purpose',
 'grade',
 'issue_date_formatted',
 'previous_loan_issue_date_same_grade',
 'previous_loan_amount_same_grade',
 'previous_loan_issue_date_same_state_grade',
 'previous_loan_amount_same_state_grade']

#### checking num of partitions

In [53]:
print(f"Number of partitions in lookup table: {lookup_table.rdd.getNumPartitions()}")
print(f"Number of partitions in encoded DataFrame: {encoded_df.rdd.getNumPartitions()}")

Number of partitions in lookup table: 218
Number of partitions in encoded DataFrame: 3


#### coalesce into less partitions

In [54]:
 # Coalesce the DataFrames into a smaller number of partitions
num_partitions = 1  # Adjust as needed

lookup_table = lookup_table.coalesce(num_partitions)
encoded_df = encoded_df.coalesce(num_partitions)

# Verify the number of partitions
print(f"Number of partitions in lookup table after coalesce: {lookup_table.rdd.getNumPartitions()}")
print(f"Number of partitions in encoded DataFrame after coalesce: {encoded_df.rdd.getNumPartitions()}")

Number of partitions in lookup table after coalesce: 1
Number of partitions in encoded DataFrame after coalesce: 1


In [57]:
encoded_df.write.parquet("/content/fintech_spark_52_16824_clean.parquet", mode="overwrite")
lookup_table.write.parquet("/content/lookup_spark_52_16824.parquet", mode="overwrite")

# Save into postgres (Bonus)

In [58]:
import os
from sqlalchemy import create_engine

url = "postgresql://postgres.bscfluoimobhvwyqmwbi:UH8QrvWneP2dpHfI@aws-0-eu-central-1.pooler.supabase.com:6543/postgres"
engine = create_engine(url)

In [59]:
encoded_df.toPandas().to_sql('fintech_spark_52_16824_clean', engine, if_exists='replace', index=False)
lookup_table.toPandas().to_sql('lookup_spark_52_16824', engine, if_exists='replace', index=False)

109