In [None]:
dbutils.widgets.text('p_file_date', '2022-09-10')
v_file_date = dbutils.widgets.get('p_file_date')

In [None]:
%run "../includes/configurations"

In [None]:
%run "../includes/common_functions"

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, BooleanType, DateType
from pyspark.sql.functions import col

In [None]:
schema = StructType(fields=[
    StructField("date", DateType(), False),
    StructField("customerId", StringType(), False),
    StructField("monthly_salary", DoubleType(), True),
    StructField("health_score", IntegerType(), True),
    StructField("current_debt", DoubleType(), True),
    StructField("category", StringType(), True),
])

In [None]:
customerDrivers_df = spark.read. \
                        option("header", True). \
                        schema(schema). \
                        csv(f"{bronze_folder_path}/customerDriver/customerDrivers_{v_file_date}.csv")

In [None]:
# We impute some values to our columns
# For customer with null monthly salary, we set the minimum vital salary (example: 1500)
# As a rule on the bank customers who don't have score yet, must have 100 of health score
# If current debt is null set to 0
tmp_customerDrivers_df = customerDrivers_df.fillna({'monthly_salary':1500,
                                                    'health_score':100,
                                                    'current_debt': 0,
                                                    'category': 'OTHERS'})

In [None]:
# We rename customerId column
# We add a flag to know which customer are risky according to their score
final_df = tmp_customerDrivers_df. \
                    withColumnRenamed('customerId', 'customer_id'). \
                    withColumn('is_risk_customer', col('health_score') < 100)

In [None]:
display(final_df)

date,customer_id,monthly_salary,health_score,current_debt,category,is_risk_customer
2022-09-10,CUS50595231748,3860.0,3,593.7,BRONZE,True
2022-09-10,CUS41095949824,7127.0,155,10404.96,SILVER,False
2022-09-10,CUS77289220724,13587.0,208,0.0,BRONZE,False
2022-09-10,CUS55697703960,1322.0,211,741.05,SILVER,False
2022-09-10,CUS91382780948,5275.0,223,84958.35,SILVER,False
2022-09-10,CUS36947218124,5292.0,226,16892.22,BRONZE,False
2022-09-10,CUS15964882412,1667.0,230,0.0,BRONZE,False
2022-09-10,CUS26768799060,6992.0,234,102834.24,SILVER,False
2022-09-10,CUS41482828460,4903.0,236,65009.84,SILVER,False
2022-09-10,CUS86644713624,15050.0,236,8557.43,BRONZE,False


In [None]:
# We save our data in delta format in our silver container
# We use replaceWhere option in case we need to re-process our data
final_df.write.format("delta") \
              .mode("overwrite") \
              .partitionBy('date') \
              .option("replaceWhere", f"date == '{v_file_date}'") \
              .save(f"{silver_folder_path}/customerDrivers")

In [None]:
dbutils.notebook.exit("Success")