## 1  Loading the dataset

In [38]:
# from pyspark.sql import SparkSession

# # Initialize SparkSession
# spark = SparkSession.builder .appName("MS3")  .getOrCreate()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
spark = SparkSession.builder.config("spark.jars", "/root/.docker/run/postgresql-42.7.4.jar") \
	.master("local").appName("MS3").getOrCreate()


# Load the dataset
dataset_path = "fintech_data_30_52_21362.parquet"  # Replace with the actual path
df = spark.read.parquet(dataset_path)

# Preview first 20 rows
print("Preview of first 20 rows:")
df.show(20)

# Check the number of partitions
initial_partitions = df.rdd.getNumPartitions()
print(f"Initial number of partitions: {initial_partitions}")

# Get the number of logical cores
import os
logical_cores = os.cpu_count()
print(f"Number of logical cores: {logical_cores}")

# Repartition the DataFrame
df_repartitioned = df.repartition(logical_cores)

# Verify the new number of partitions
new_partitions = df_repartitioned.rdd.getNumPartitions()
print(f"New number of partitions: {new_partitions}")


Preview of first 20 rows:
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+---------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|    Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|       Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+---------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|Y

## 2  Cleaning

### a. rename columns

In [39]:
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType, NumericType

# Rename columns: replace spaces with underscores and convert to lowercase
def rename_columns(df: DataFrame) -> DataFrame:
    for col in df.columns:
        df = df.withColumnRenamed(col, col.replace(" ", "_").lower())
    return df

df = rename_columns(df)



### b. calculate missing values and handle them

In [40]:

# Function to calculate percentage of missing values
def calculate_missing_percentage(df: DataFrame) -> dict:
    total_rows = df.count()
    missing_info = {}
    for col in df.columns:
        missing_count = df.filter(F.col(col).isNull()).count()
        missing_percentage = (missing_count / total_rows) * 100
        missing_info[col] = missing_percentage
    return missing_info

# Get missing info and print it
missing_info = calculate_missing_percentage(df)
print("Missing values info:")
for col, perc in missing_info.items():
    print(f"{col}: {perc:.2f}%")



Missing values info:
customer_id: 0.00%
emp_title: 9.10%
emp_length: 7.14%
home_ownership: 0.00%
annual_inc: 0.00%
annual_inc_joint: 93.05%
verification_status: 0.00%
zip_code: 0.00%
addr_state: 0.00%
avg_cur_bal: 0.00%
tot_cur_bal: 0.00%
loan_id: 0.00%
loan_status: 0.00%
loan_amount: 0.00%
state: 0.00%
funded_amount: 0.00%
term: 0.00%
int_rate: 4.62%
grade: 0.00%
issue_date: 0.00%
pymnt_plan: 0.00%
type: 0.00%
purpose: 0.00%
description: 0.88%


In [41]:
# Handle missing values
def handle_missing_values(df):
    for col, dtype in df.dtypes:
        if dtype == "string":
            # Calculate the mode (most frequent value) for the column

            mode_row = (
                df.groupBy(col)
                .count()
                .orderBy(F.col("count").desc())
                .limit(1)
            )
            # Extract the mode value
            mode_value = mode_row.collect()[0][col] 

            if mode_value is not None:
                df = df.fillna({col: mode_value})
            else:
                mode_value= df.select(col).first()[0] 
                df = df.fillna({col: mode_value })
        elif dtype in ["int", "double", "float"]:
            # Replace missing numerical values with 0
            df = df.fillna({col: 0})
    return df

# Apply the function to handle missing values
df = handle_missing_values(df)

# Verify no missing values remain
missing_info_after = calculate_missing_percentage(df)
print("Missing values after handling:")
for col, perc in missing_info_after.items():
    print(f"{col}: {perc:.2f}%")

assert all(perc == 0 for perc in missing_info_after.values()), "Some columns still have missing values!"
print("No missing values remain in the dataset.")


Missing values after handling:
customer_id: 0.00%
emp_title: 0.00%
emp_length: 0.00%
home_ownership: 0.00%
annual_inc: 0.00%
annual_inc_joint: 0.00%
verification_status: 0.00%
zip_code: 0.00%
addr_state: 0.00%
avg_cur_bal: 0.00%
tot_cur_bal: 0.00%
loan_id: 0.00%
loan_status: 0.00%
loan_amount: 0.00%
state: 0.00%
funded_amount: 0.00%
term: 0.00%
int_rate: 0.00%
grade: 0.00%
issue_date: 0.00%
pymnt_plan: 0.00%
type: 0.00%
purpose: 0.00%
description: 0.00%
No missing values remain in the dataset.


## 3 Encoding

In [42]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# 1. Convert Emp Length to numerical
def convert_emp_length_to_numeric(df):
    emp_length_map = {
        "< 1 year": 0,
        "1 year": 1,
        "2 years": 2,
        "3 years": 3,
        "4 years": 4,
        "5 years": 5,
        "6 years": 6,
        "7 years": 7,
        "8 years": 8,
        "9 years": 9,
        "10+ years": 10
    }
    # Start with the first condition
    emp_length_column = F.when(F.col("emp_length") == list(emp_length_map.keys())[0], list(emp_length_map.values())[0])
    
    # Add the rest of the conditions
    for key, value in list(emp_length_map.items())[1:]:
        emp_length_column = emp_length_column.when(F.col("emp_length") == key, value)

    # Apply 'otherwise' to handle the cases where emp_length doesn't match any of the values
    emp_length_column = emp_length_column.otherwise(None)
    
    return df.withColumn("emp_length", emp_length_column)


In [43]:
# 2. One-Hot Encoding for Home Ownership, Verification Status, Type
from pyspark.sql.functions import when, col, lower

def one_hot_encode_columns(df, cols):

    for col_name in cols:
        # Get unique values for the column
        unique_values = [row[col_name] for row in df.select(col_name).distinct().collect()]
        
        # Create new columns for each unique value
       
        for value in unique_values:
            new_col_name = f"{col_name}_{value}"
            new_col_name=new_col_name.replace(" ", "_").lower()# e.g., Home_Ownership_Rent
            df = df.withColumn(new_col_name, when(lower(col(col_name)) == value.lower(), 1).otherwise(0))


    return df

In [44]:
# 3. Label Encoding for State, Purpose
def label_encode_columns(df, cols):
    for col in cols:
        indexer = StringIndexer(inputCol=col, outputCol=f"{col}_label", handleInvalid="skip")
        df = indexer.fit(df).transform(df)
    return df

In [45]:
def discretize_grade(df):
    # Convert the numeric grade to a letter grade
    return df.withColumn(
        "grade_discretized",
        F.when((F.col("grade") >= 1) & (F.col("grade") <= 5), "A")
        .when((F.col("grade") >= 6) & (F.col("grade") <= 10), "B")
        .when((F.col("grade") >= 11) & (F.col("grade") <= 15), "C")
        .when((F.col("grade") >= 16) & (F.col("grade") <= 20), "D")
        .when((F.col("grade") >= 21) & (F.col("grade") <= 25), "E")
        .when((F.col("grade") >= 26) & (F.col("grade") <= 30), "F")
        .when((F.col("grade") >= 31) & (F.col("grade") <= 35), "G")
        .otherwise("Unknown")  # For any values outside the expected range
    )

In [46]:
df = convert_emp_length_to_numeric(df)
df = one_hot_encode_columns(df, ["home_ownership", "verification_status", "type"])
df = label_encode_columns(df, ["state", "purpose"])
df = discretize_grade(df)
df.show()


+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+---------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-----------+-------------+-----------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|    loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_own|home_ownership_rent|home_ownership_mortgage|home_own

## 4 Feature Engineering

In [47]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def add_previous_features(df):
    # Define the window spec for previous loan values within the same grade
    window_grade = Window.partitionBy("grade_discretized").orderBy("issue_date")
  
    # Define the window spec for previous loan values within the same state and grade
    window_state_grade = Window.partitionBy("state", "grade_discretized").orderBy("issue_date")
    
    # Adding the previous loan issue date within the same grade
    df = df.withColumn(
        "prev_loan_issue_date_grade",
        F.lag("issue_date").over(window_grade)
    )
    
    # Adding the previous loan amount within the same grade
    df = df.withColumn(
        "prev_loan_amount_grade",
        F.lag("loan_amount").over(window_grade)
    )
    
    # Adding the previous loan issue date within the same state and grade combination
    df = df.withColumn(
        "prev_loan_issue_date_state_grade",
        F.lag("issue_date").over(window_state_grade)
    )
    
    # Adding the previous loan amount within the same state and grade combination
    df = df.withColumn(
        "prev_loan_amount_state_grade",
        F.lag("loan_amount").over(window_state_grade)
    )
    
    return df
df = df.withColumn("issue_date", F.to_date("issue_date", "dd MMMM yyyy"))
df = add_previous_features(df)
df.show()


+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-----------+-------------+-----------------+--------------------------+----------------------+--------------------------------+----------------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|      type|           purpose|  

## 5  Analysis SQL vs Spark

### a.Identify the average loan amount and interest rate for loans marked as "Default" in the Loan Status, grouped by Emp Length and annual income ranges

In [48]:
df.createOrReplaceTempView("loans")
query="WITH DefaultLoans AS (\
    SELECT * FROM loans WHERE loan_status = 'Default'),\
IncomeBins AS ( SELECT *, NTILE(4) OVER (ORDER BY annual_inc) AS income_range FROM DefaultLoans ) SELECT emp_length,  income_range,\
    AVG(loan_amount) AS avg_loan_amount,\
    AVG(int_rate) AS avg_interest_rate \
FROM IncomeBins \
GROUP BY emp_length, income_range \
ORDER BY emp_length, income_range";
query1_df = spark.sql(query)
query1_df.show()

24/11/28 15:40:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:40:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:40:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:40:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----------+------------+---------------+-----------------+
|emp_length|income_range|avg_loan_amount|avg_interest_rate|
+----------+------------+---------------+-----------------+
|         2|           2|         4000.0|           0.1899|
|         3|           3|        35000.0|           0.1344|
|        10|           1|         1000.0|           0.1614|
+----------+------------+---------------+-----------------+



In [50]:

default_loans = df.filter(col("loan_status") == "Default")

windowSpec = Window.orderBy("annual_inc")


income_binned = default_loans.withColumn(
    "income_range", 
    F.ntile(4).over(windowSpec)  
)

result = income_binned.groupBy("emp_length", "income_range").agg(
    F.avg("loan_amount").alias("avg_loan_amount"),
    F.avg("int_rate").alias("avg_interest_rate")
).orderBy("emp_length", "income_range")

result.show()


+----------+------------+---------------+-----------------+
|emp_length|income_range|avg_loan_amount|avg_interest_rate|
+----------+------------+---------------+-----------------+
|         2|           2|         4000.0|           0.1899|
|         3|           3|        35000.0|           0.1344|
|        10|           1|         1000.0|           0.1614|
+----------+------------+---------------+-----------------+



24/11/28 15:40:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:40:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:40:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/11/28 15:40:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


### b.  Calculate the average difference between Loan Amount and Funded Amount for each loan Grade and sort by the grades with the largest differences.

In [51]:
query="WITH LoanDifferences AS ( SELECT grade_discretized, loan_amount - funded_amount AS diff FROM loans )\
SELECT grade_discretized, AVG(diff) AS avg_difference FROM LoanDifferences GROUP BY grade_discretized ORDER BY avg_difference DESC";
query2_df = spark.sql(query)
query2_df.show()

+-----------------+--------------+
|grade_discretized|avg_difference|
+-----------------+--------------+
|                F|           0.0|
|                E|           0.0|
|                B|           0.0|
|                D|           0.0|
|                C|           0.0|
|                A|           0.0|
|                G|           0.0|
+-----------------+--------------+



In [52]:

result2_df = df.withColumn("diff", F.col("loan_amount") - F.col("funded_amount")) \
    .groupBy("grade_discretized") \
    .agg(F.avg("diff").alias("avg_difference")) \
    .orderBy(F.desc("avg_difference"))

# Show the result
result2_df.show()


+-----------------+--------------+
|grade_discretized|avg_difference|
+-----------------+--------------+
|                F|           0.0|
|                E|           0.0|
|                B|           0.0|
|                D|           0.0|
|                C|           0.0|
|                A|           0.0|
|                G|           0.0|
+-----------------+--------------+



### c. Compare the total Loan Amount for loans with "Verified" and "Not Verified" Verification Status across each state (Addr State).

In [53]:
query = "SELECT addr_state, verification_status, SUM(loan_amount) AS total_loan_amount FROM loans GROUP BY addr_state, verification_status \
ORDER BY addr_state, verification_status"

# Execute the query
query3_df = spark.sql(query)

# Show the results
query3_df.show()

+----------+-------------------+-----------------+
|addr_state|verification_status|total_loan_amount|
+----------+-------------------+-----------------+
|        AK|       Not Verified|         363625.0|
|        AK|    Source Verified|         582150.0|
|        AK|           Verified|         366475.0|
|        AL|       Not Verified|        1737775.0|
|        AL|    Source Verified|        1655725.0|
|        AL|           Verified|        1202350.0|
|        AR|       Not Verified|         909575.0|
|        AR|    Source Verified|        1411425.0|
|        AR|           Verified|         774250.0|
|        AZ|       Not Verified|        3151075.0|
|        AZ|    Source Verified|        3748200.0|
|        AZ|           Verified|        2667200.0|
|        CA|       Not Verified|        1.78475E7|
|        CA|    Source Verified|       2.352335E7|
|        CA|           Verified|        1.71016E7|
|        CO|       Not Verified|        3022825.0|
|        CO|    Source Verified

In [54]:
result3_df = df.groupBy("addr_state", "verification_status") \
    .agg(F.sum("loan_amount").alias("total_loan_amount")) \
    .orderBy("addr_state", "verification_status")

# Show the results
result3_df.show()

+----------+-------------------+-----------------+
|addr_state|verification_status|total_loan_amount|
+----------+-------------------+-----------------+
|        AK|       Not Verified|         363625.0|
|        AK|    Source Verified|         582150.0|
|        AK|           Verified|         366475.0|
|        AL|       Not Verified|        1737775.0|
|        AL|    Source Verified|        1655725.0|
|        AL|           Verified|        1202350.0|
|        AR|       Not Verified|         909575.0|
|        AR|    Source Verified|        1411425.0|
|        AR|           Verified|         774250.0|
|        AZ|       Not Verified|        3151075.0|
|        AZ|    Source Verified|        3748200.0|
|        AZ|           Verified|        2667200.0|
|        CA|       Not Verified|        1.78475E7|
|        CA|    Source Verified|       2.352335E7|
|        CA|           Verified|        1.71016E7|
|        CO|       Not Verified|        3022825.0|
|        CO|    Source Verified

### d. Calculate the average time gap (in days) between consecutive loans for each grade using the new features you added in the feature engineering phase


In [55]:
query = "SELECT grade_discretized, AVG(DATEDIFF(issue_date, prev_loan_issue_date_grade)) AS avg_time_gap FROM loans WHERE prev_loan_issue_date_grade IS NOT NULL \
GROUP BY grade_discretized \
ORDER BY grade_discretized"
query4_df = spark.sql(query)

# Show the results
query4_df.show()

+-----------------+-------------------+
|grade_discretized|       avg_time_gap|
+-----------------+-------------------+
|                A|0.47154899894625923|
|                B| 0.3402179422199696|
|                C|0.35024784763892514|
|                D|  0.685823754789272|
|                E| 1.8169117647058823|
|                F|   5.78974358974359|
|                G| 20.057142857142857|
+-----------------+-------------------+



In [56]:
df_with_gap = df.withColumn("time_gap", F.datediff("issue_date", "prev_loan_issue_date_grade"))


df_with_gap_filtered = df_with_gap.filter(df_with_gap.time_gap.isNotNull()&df_with_gap.prev_loan_issue_date_grade.isNotNull())

result4_df = df_with_gap_filtered.groupBy("grade_discretized") \
    .agg(F.avg("time_gap").alias("avg_time_gap")) \
    .orderBy("grade_discretized")

result4_df.show()

+-----------------+-------------------+
|grade_discretized|       avg_time_gap|
+-----------------+-------------------+
|                A|0.47154899894625923|
|                B| 0.3402179422199696|
|                C|0.35024784763892514|
|                D|  0.685823754789272|
|                E| 1.8169117647058823|
|                F|   5.78974358974359|
|                G| 20.057142857142857|
+-----------------+-------------------+



### e.  Identify the average difference in loan amounts between consecutive loans within the same state and grade combination.

In [57]:
query="WITH loan_diffs AS (SELECT state,grade_discretized,loan_amount,prev_loan_amount_state_grade,\
loan_amount - prev_loan_amount_state_grade AS loan_amount_diff_state_grade \
FROM loans \
WHERE prev_loan_amount_state_grade IS NOT NULL) \
SELECT state,grade_discretized,AVG(loan_amount_diff_state_grade) AS avg_loan_amount_diff \
FROM loan_diffs \
GROUP BY state, grade_discretized \
ORDER BY avg_loan_amount_diff DESC"

query5_df = spark.sql(query)


query5_df.show()

+-----+-----------------+--------------------+
|state|grade_discretized|avg_loan_amount_diff|
+-----+-----------------+--------------------+
|   WY|                E|             21525.0|
|   WI|                G|             16950.0|
|   HI|                F|             15625.0|
|   ME|                F|              7662.5|
|   DE|                E|              6625.0|
|   KY|                F|   6383.333333333333|
|   OK|                F|              5875.0|
|   WV|                E|  3392.8571428571427|
|   AK|                D|              3370.0|
|   VT|                D|              3000.0|
|   HI|                E|  2832.1428571428573|
|   IN|                F|  2738.8888888888887|
|   UT|                E|  2444.4444444444443|
|   MA|                F|              2375.0|
|   SD|                D|   2285.714285714286|
|   SD|                A|  2261.5384615384614|
|   NM|                F|              2000.0|
|   DE|                A|   1857.142857142857|
|   TX|      

In [58]:
df = df.withColumn(
    "loan_amount_diff_state_grade",
    df["loan_amount"] - df["prev_loan_amount_state_grade"]
)

# Filter out rows where the previous loan amount is null (to exclude the first loan in each state/grade combination)
result = df.filter(df["prev_loan_amount_state_grade"].isNotNull()) \
    .groupBy("state", "grade_discretized") \
    .agg(F.avg("loan_amount_diff_state_grade").alias("avg_loan_amount_diff")) \
    .orderBy(F.desc("avg_loan_amount_diff"))

# Show the result
result.show()

+-----+-----------------+--------------------+
|state|grade_discretized|avg_loan_amount_diff|
+-----+-----------------+--------------------+
|   WY|                E|             21525.0|
|   WI|                G|             16950.0|
|   HI|                F|             15625.0|
|   ME|                F|              7662.5|
|   DE|                E|              6625.0|
|   KY|                F|   6383.333333333333|
|   OK|                F|              5875.0|
|   WV|                E|  3392.8571428571427|
|   AK|                D|              3370.0|
|   VT|                D|              3000.0|
|   HI|                E|  2832.1428571428573|
|   IN|                F|  2738.8888888888887|
|   UT|                E|  2444.4444444444443|
|   MA|                F|              2375.0|
|   SD|                D|   2285.714285714286|
|   SD|                A|  2261.5384615384614|
|   NM|                F|              2000.0|
|   DE|                A|   1857.142857142857|
|   TX|      

## 6  lookup Saving the dataset

In [59]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

def create_lookup_table(df):
    lookup_entries = []

    # Lookup for emp_length
    emp_length_map = {
        "< 1 year": 0, "1 year": 1, "2 years": 2, "3 years": 3, 
        "4 years": 4, "5 years": 5, "6 years": 6, "7 years": 7, 
        "8 years": 8, "9 years": 9, "10+ years": 10
    }
    for k, v in emp_length_map.items():
        lookup_entries.append(("emp_length", k, str(v)))

    # Lookup for home_ownership, verification_status, and type (One-Hot Encoded)
    for col_name in ["home_ownership", "verification_status", "type"]:
        unique_values = df.select(col_name).distinct().rdd.flatMap(lambda x: x).collect()
        for value in unique_values:
            one_hot_value = f"{col_name}_{value}".replace(" ", "_").lower()
            lookup_entries.append((col_name, value, one_hot_value))

    # Lookup for state and purpose (Label Encoded)
    for col_name in ["state", "purpose"]:
        unique_values = df.select(col_name).distinct().rdd.flatMap(lambda x: x).collect()
        for idx, value in enumerate(unique_values):
            lookup_entries.append((col_name, value, str(idx)))

    # Lookup for grade_discretized
    grade_discretized_map = {
        "A": "1-5", "B": "6-10", "C": "11-15", 
        "D": "16-20", "E": "21-25", "F": "26-30", "G": "31-35"
    }
    for k, v in grade_discretized_map.items():
        lookup_entries.append(("grade_discretized", k, v))

    # Convert to a DataFrame
    lookup_schema = T.StructType([
        T.StructField("column_name", T.StringType(), True),
        T.StructField("original_value", T.StringType(), True),
        T.StructField("encoded_value", T.StringType(), True),
    ])
    lookup_df = spark.createDataFrame(lookup_entries, schema=lookup_schema)

    return lookup_df


lookup_table = create_lookup_table(df)


lookup_table.show(100)

# Save to Parquet
lookup_table.write.mode("overwrite").parquet("lookup_spark_52_21362.parquet")


                                                                                

+-------------------+------------------+--------------------+
|        column_name|    original_value|       encoded_value|
+-------------------+------------------+--------------------+
|         emp_length|          < 1 year|                   0|
|         emp_length|            1 year|                   1|
|         emp_length|           2 years|                   2|
|         emp_length|           3 years|                   3|
|         emp_length|           4 years|                   4|
|         emp_length|           5 years|                   5|
|         emp_length|           6 years|                   6|
|         emp_length|           7 years|                   7|
|         emp_length|           8 years|                   8|
|         emp_length|           9 years|                   9|
|         emp_length|         10+ years|                  10|
|     home_ownership|               OWN|  home_ownership_own|
|     home_ownership|              RENT| home_ownership_rent|
|     ho

In [60]:
df.write.parquet("fintech_spark_52_21362_clean.parquet", mode="overwrite") 

                                                                                

## 7 Bonus

In [61]:
from pyspark.sql import SparkSession
import pandas as pd
from sqlalchemy import create_engine


df_pandas = df.toPandas()
lookup_table_pandas = lookup_table.toPandas()

# Database connection parameters
db_user = "dareen"
db_password = "dareen"
db_host = "localhost"
db_port = "5432"
db_name = "testdb"

# Create SQLAlchemy engine
engine = create_engine(f"postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}")
if(engine.connect()):
        print('Connected to Database')
# Save DataFrame to PostgreSQL
df_pandas.to_sql("cleaned_db", engine, if_exists="replace", index=False)
lookup_table_pandas.to_sql("lookup_table", engine, if_exists="replace", index=False)

print("Both DataFrames saved to PostgreSQL!")


Connected to Database
Both DataFrames saved to PostgreSQL!


In [62]:
spark.stop()