In [6]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType, StructField, StringType, FloatType, DecimalType,IntegerType
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Initialize Spark with Snowflake packages
spark = SparkSession.builder \
    .appName("SnowflakeLoad_LoanProduct") \
    .config("spark.jars.packages", "net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3") \
    .getOrCreate()



# Load CSV file into DataFrame (Replace 'your_file.csv' with actual filename)
csv_path = "Transformed_2014_18_v2.csv"
df = spark.read.option("header", True).csv(csv_path)  #.sample(withReplacement=False, fraction=0.0001, seed=42)


#### customize the csv file for Dim_loan_product

column_mapping = {
    "hardship_flag": "HARDSHIP_FLAG",
    "hardship_type": "HARDSHIP_TYPE",
    "hardship_reason": "HARDSHIP_REASON",
    "hardship_status": "HARDSHIP_STATUS",
    "hardship_amount": "HARDSHIP_AMOUNT",
    "hardship_start_date": "HARDSHIP_START_DATE",
    "hardship_end_date": "HARDSHIP_END_DATE",
    "hardship_length": "HARDSHIP_LENGTH",
    "hardship_dpd":"HARDSHIP_DPD",
    "hardship_loan_status":"HARDSHIP_LOAN_STATUS",
    "hardship_payoff_balance_amount":"HARDSHIP_PAYOFF_BALANCE_AMOUNT",
    "id":"LOANPRODUCT_BK"
}

columns_of_hardship = [
    "hardship_flag", "hardship_type", "hardship_reason", "hardship_status","hardship_amount", "hardship_start_date", "hardship_end_date", "hardship_length","hardship_dpd",
    "hardship_loan_status","hardship_payoff_balance_amount","id"
]
 
dim_hardship = df.filter(df.hardship_flag == 'y').select(columns_of_hardship)
dim_hardship.show(5)

# Snowflake connection options
sf_options = {
    "sfURL": "https://WOA97553.east-us-2.azure.snowflakecomputing.com",
    "sfUser": 'hussien1',
    "sfPassword":'@Hussien123456',
    "sfDatabase": 'Loan_DB',
    "sfSchema": 'Loan_Schema',
    "sfWarehouse": 'loan_Warehouse',
    "autopushdown": "on",
    "usestagingtable": "on"
}

from pyspark.sql import functions as F
from pyspark.sql.window import Window



# Rename columns in the DataFrame according to the column mapping
for source_col, dest_col in column_mapping.items():
    if source_col in dim_hardship.columns:
        dim_hardship = dim_hardship.withColumnRenamed(source_col, dest_col)

# Define the window spec for row numbering
windowSpec = Window.orderBy(F.lit(1))  # Constant value to ensure numbering starts from 1

# Add the BORROWER_KEY_PK_SK column with row numbers starting from 1
dim_hardship = dim_hardship.withColumn("HARDSHIP_ID_PK", F.row_number().over(windowSpec))

# List of columns to select (ensure that the column names match after renaming)
columns_of_hardship = [
    "HARDSHIP_ID_PK","HARDSHIP_FLAG", "HARDSHIP_TYPE", "HARDSHIP_REASON", "HARDSHIP_STATUS","HARDSHIP_AMOUNT", "HARDSHIP_START_DATE", "HARDSHIP_END_DATE", "HARDSHIP_LENGTH"
    ,"HARDSHIP_DPD",
    "HARDSHIP_LOAN_STATUS","HARDSHIP_PAYOFF_BALANCE_AMOUNT","LOANPRODUCT_BK"
]

# Select the required columns
dim_hardship = dim_hardship.select(columns_of_hardship)

# Show DataFrame to verify changes
print("Transformed Data for dim_hardship:")

dim_hardship.printSchema()
dim_hardship.show(5)

+-------------+--------------------+--------------------+---------------+---------------+-------------------+-----------------+---------------+------------+--------------------+------------------------------+--------+
|hardship_flag|       hardship_type|     hardship_reason|hardship_status|hardship_amount|hardship_start_date|hardship_end_date|hardship_length|hardship_dpd|hardship_loan_status|hardship_payoff_balance_amount|      id|
+-------------+--------------------+--------------------+---------------+---------------+-------------------+-----------------+---------------+------------+--------------------+------------------------------+--------+
|            y|INTEREST ONLY-3 M...|EXCESSIVE_OBLIGAT...|      COMPLETED|           13.4|           Jun-2017|         Sep-2017|            3.0|        10.0|     In Grace Period|                       1602.87|  636909|
|            y|INTEREST ONLY-3 M...|             MEDICAL|         BROKEN|         216.36|           Sep-2017|         Oct-2017| 

# Load Dim_loanproduct from snowflake into a spark data frame 

In [7]:
from pyspark.sql import SparkSession


# Step 3: Load data from Snowflake table into Spark DataFrame
df_dim_Loanproduct = spark.read \
    .format("snowflake") \
    .options(**sf_options) \
    .option("dbtable", "DIM_LOANPRODUCT") \
    .load()

# Step 4: Show the data
df_dim_Loanproduct.show(4)

# Optional: Save the DataFrame as a CSV or another format if needed
# df_snowflake.write.option("header", "true").csv("output_path")


+-------------------+----------+-------------+---------+------------------+-------------------+-------------+----------------+--------------------+--------------+
|LOAN_PRODUCT_KEY_PK|LOAN_GRADE|LOAN_SUBGRADE|LOAN_TERM|      LOAN_PURPOSE|INITIAL_LIST_STATUS|OUT_PRINCIPAL|APPLICATION_TYPE|DEBT_SETTLEMENT_FLAG|LOANPRODUCT_BK|
+-------------------+----------+-------------+---------+------------------+-------------------+-------------+----------------+--------------------+--------------+
|                  1|         A|           A2|       36|  home_improvement|                  w|          0.0|      Individual|                   N|         56121|
|                  2|         D|           D4|       36|debt_consolidation|                  w|          0.0|      Individual|                   N|         65104|
|                  3|         A|           A3|       36|  home_improvement|                  w|          0.0|      Individual|                   N|         65419|
|                  4| 

# join dim_hardship with dim_loanproduct to get df_dim_hardship_enrich

In [8]:
# Join based on matching loan product fields (example: loan_grade, loan_term, etc.)
df_dim_hardship_enrich = dim_hardship.join(
    df_dim_Loanproduct,
    on=[
        dim_hardship["LOANPRODUCT_BK"] == df_dim_Loanproduct["LOANPRODUCT_BK"]
          # add more if needed
    ],
    how="left"
)

df_dim_hardship_enrich.show(2)


+--------------+-------------+--------------------+---------------+---------------+---------------+-------------------+-----------------+---------------+------------+--------------------+------------------------------+--------------+-------------------+----------+-------------+---------+------------------+-------------------+-------------+----------------+--------------------+--------------+
|HARDSHIP_ID_PK|HARDSHIP_FLAG|       HARDSHIP_TYPE|HARDSHIP_REASON|HARDSHIP_STATUS|HARDSHIP_AMOUNT|HARDSHIP_START_DATE|HARDSHIP_END_DATE|HARDSHIP_LENGTH|HARDSHIP_DPD|HARDSHIP_LOAN_STATUS|HARDSHIP_PAYOFF_BALANCE_AMOUNT|LOANPRODUCT_BK|LOAN_PRODUCT_KEY_PK|LOAN_GRADE|LOAN_SUBGRADE|LOAN_TERM|      LOAN_PURPOSE|INITIAL_LIST_STATUS|OUT_PRINCIPAL|APPLICATION_TYPE|DEBT_SETTLEMENT_FLAG|LOANPRODUCT_BK|
+--------------+-------------+--------------------+---------------+---------------+---------------+-------------------+-----------------+---------------+------------+--------------------+-----------------------

# # Select final columns

In [12]:
new_columns_of_hardship = [
    "HARDSHIP_ID_PK","HARDSHIP_FLAG", "HARDSHIP_TYPE", "HARDSHIP_REASON", "HARDSHIP_STATUS",
    "HARDSHIP_AMOUNT","HARDSHIP_START_DATE", "HARDSHIP_END_DATE", "HARDSHIP_LENGTH","HARDSHIP_DPD",
    "HARDSHIP_LOAN_STATUS","HARDSHIP_PAYOFF_BALANCE_AMOUNT",dim_hardship["LOANPRODUCT_BK"],"LOAN_PRODUCT_KEY_PK"
]

final_dim_hardship_df = df_dim_hardship_enrich.select(new_columns_of_hardship)

final_dim_hardship_df.show(2)
final_dim_hardship_df.printSchema()

+--------------+-------------+--------------------+---------------+---------------+---------------+-------------------+-----------------+---------------+------------+--------------------+------------------------------+--------------+-------------------+
|HARDSHIP_ID_PK|HARDSHIP_FLAG|       HARDSHIP_TYPE|HARDSHIP_REASON|HARDSHIP_STATUS|HARDSHIP_AMOUNT|HARDSHIP_START_DATE|HARDSHIP_END_DATE|HARDSHIP_LENGTH|HARDSHIP_DPD|HARDSHIP_LOAN_STATUS|HARDSHIP_PAYOFF_BALANCE_AMOUNT|LOANPRODUCT_BK|LOAN_PRODUCT_KEY_PK|
+--------------+-------------+--------------------+---------------+---------------+---------------+-------------------+-----------------+---------------+------------+--------------------+------------------------------+--------------+-------------------+
|             3|            y|INTEREST ONLY-3 M...|     DISABILITY|      COMPLETED|          33.99|           Jun-2018|         Sep-2018|            3.0|         9.0|     In Grace Period|                       2556.48|      11365925|     

In [14]:
final_dim_hardship_df.show()

+--------------+-------------+--------------------+--------------------+---------------+---------------+-------------------+-----------------+---------------+------------+--------------------+------------------------------+--------------+-------------------+
|HARDSHIP_ID_PK|HARDSHIP_FLAG|       HARDSHIP_TYPE|     HARDSHIP_REASON|HARDSHIP_STATUS|HARDSHIP_AMOUNT|HARDSHIP_START_DATE|HARDSHIP_END_DATE|HARDSHIP_LENGTH|HARDSHIP_DPD|HARDSHIP_LOAN_STATUS|HARDSHIP_PAYOFF_BALANCE_AMOUNT|LOANPRODUCT_BK|LOAN_PRODUCT_KEY_PK|
+--------------+-------------+--------------------+--------------------+---------------+---------------+-------------------+-----------------+---------------+------------+--------------------+------------------------------+--------------+-------------------+
|             3|            y|INTEREST ONLY-3 M...|          DISABILITY|      COMPLETED|          33.99|           Jun-2018|         Sep-2018|            3.0|         9.0|     In Grace Period|                       2556.48|

In [19]:
final_dim_hardship_df.printSchema()

root
 |-- HARDSHIP_ID_PK: integer (nullable = false)
 |-- HARDSHIP_FLAG: string (nullable = true)
 |-- HARDSHIP_TYPE: string (nullable = true)
 |-- HARDSHIP_REASON: string (nullable = true)
 |-- HARDSHIP_STATUS: string (nullable = true)
 |-- HARDSHIP_AMOUNT: string (nullable = true)
 |-- HARDSHIP_START_DATE: string (nullable = true)
 |-- HARDSHIP_END_DATE: string (nullable = true)
 |-- HARDSHIP_LENGTH: string (nullable = true)
 |-- HARDSHIP_DPD: string (nullable = true)
 |-- HARDSHIP_LOAN_STATUS: string (nullable = true)
 |-- HARDSHIP_PAYOFF_BALANCE_AMOUNT: string (nullable = true)
 |-- LOANPRODUCT_BK: string (nullable = true)
 |-- LOAN_PRODUCT_KEY_PK: decimal(38,0) (nullable = true)



In [18]:
# Define the window spec for row numbering
windowSpec = Window.orderBy(F.lit(1))  # Constant value to ensure numbering starts from 1

# Add the BORROWER_KEY_PK_SK column with row numbers starting from 1
final_dim_hardship_df = final_dim_hardship_df.withColumn("HARDSHIP_ID_PK", F.row_number().over(windowSpec))

# List of columns to select (ensure that the column names match after renaming)
final_columns_of_hardship = [
    "HARDSHIP_ID_PK","HARDSHIP_FLAG", "HARDSHIP_TYPE", "HARDSHIP_REASON", "HARDSHIP_STATUS","HARDSHIP_AMOUNT", 
    "HARDSHIP_START_DATE", "HARDSHIP_END_DATE", "HARDSHIP_LENGTH"
    ,"HARDSHIP_DPD",
    "HARDSHIP_LOAN_STATUS","HARDSHIP_PAYOFF_BALANCE_AMOUNT","LOANPRODUCT_BK","LOAN_PRODUCT_KEY_PK"
]

# Select the required columns
final_dim_hardship_df = final_dim_hardship_df.select(final_columns_of_hardship)

In [20]:
final_dim_hardship_df.show(2)

+--------------+-------------+--------------------+------------------+---------------+---------------+-------------------+-----------------+---------------+------------+--------------------+------------------------------+--------------+-------------------+
|HARDSHIP_ID_PK|HARDSHIP_FLAG|       HARDSHIP_TYPE|   HARDSHIP_REASON|HARDSHIP_STATUS|HARDSHIP_AMOUNT|HARDSHIP_START_DATE|HARDSHIP_END_DATE|HARDSHIP_LENGTH|HARDSHIP_DPD|HARDSHIP_LOAN_STATUS|HARDSHIP_PAYOFF_BALANCE_AMOUNT|LOANPRODUCT_BK|LOAN_PRODUCT_KEY_PK|
+--------------+-------------+--------------------+------------------+---------------+---------------+-------------------+-----------------+---------------+------------+--------------------+------------------------------+--------------+-------------------+
|             1|            y|INTEREST ONLY-3 M...|        DISABILITY|      COMPLETED|          33.99|           Jun-2018|         Sep-2018|            3.0|         9.0|     In Grace Period|                       2556.48|      11

In [1]:
!pwd

/home/jovyan


In [21]:

try:
    print("Testing Snowflake connection...")
    test_df = spark.read \
        .format("net.snowflake.spark.snowflake") \
        .options(**sf_options) \
        .option("query", "SELECT 1 AS test_value") \
        .load()
    test_df.show()
    print("Connection test successful!")
 
    print("Writing data to Snowflake...")
    final_dim_hardship_df.write \
        .format("snowflake") \
        .options(**sf_options) \
        .option("dbtable", "dim_hardship") \
        .mode("append") \
        .save()
 
    print("Data load complete!")
 #######################

#############################################
    print("Reading data back from Snowflake to verify...")
    snowflake_df = spark.read \
        .format("net.snowflake.spark.snowflake") \
        .options(**sf_options) \
        .option("query", "SELECT * FROM Loan_DB.Loan_Schema.dim_hardship") \
        .load()
 
    snowflake_df.show()
 
except Exception as e:
    print("Error occurred:", str(e))
 
#finally:
    #spark.stop()

Testing Snowflake connection...
+----------+
|TEST_VALUE|
+----------+
|         1|
+----------+

Connection test successful!
Writing data to Snowflake...
Data load complete!
Reading data back from Snowflake to verify...
+--------------+-------------+--------------------+--------------------+---------------+---------------+-------------------+-----------------+---------------+------------+--------------------+------------------------------+--------------+-------------------+
|HARDSHIP_ID_PK|HARDSHIP_FLAG|       HARDSHIP_TYPE|     HARDSHIP_REASON|HARDSHIP_STATUS|HARDSHIP_AMOUNT|HARDSHIP_START_DATE|HARDSHIP_END_DATE|HARDSHIP_LENGTH|HARDSHIP_DPD|HARDSHIP_LOAN_STATUS|HARDSHIP_PAYOFF_BALANCE_AMOUNT|LOANPRODUCT_BK|LOAN_PRODUCT_KEY_PK|
+--------------+-------------+--------------------+--------------------+---------------+---------------+-------------------+-----------------+---------------+------------+--------------------+------------------------------+--------------+-------------------+
| 