### Import Libraries

In [1]:
import pandas as pd

In [None]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Ingest raw data

In [None]:
# Parquet file path
users_parquet_path = "/mnt/datalake/raw/users.parquet"
application_parquet_path = "/mnt/datalake/raw/applications.parquet"

@dlt.create_table(
    comment="The raw users datasets ingested from a mounted location on Azure datalake"
)
def users_raw():
    return (
    spark.read.parquet(application_parquet_path)
    )

@dlt.create_table(
    comment="The raw users datasets ingested from a mounted location on Azure datalake"
)
def applications_raw():
    return (
    spark.read.parquet(users_parquet_path)
    )

### Clean and prepare data

In [None]:
@dlt.create_table(
    comment="The raw users data cleaned and prepared for analysis"
)
@dlt.expect_or_fail('valid_user','user_id IS NOT NULL')
@dlt.expect_or_fail('valid_avg_airtime', 'avg_airtime > 0')
@dlt.expect_or_fail('valid_monthly_income', 'monthly_income > 0')

def users_prepared():
    return (
        dlt.read("users_raw")
            .withColumn("education_status",  expr("CAST(n AS STRING)"))
            .withColumn("employment_status",  expr("CAST(n AS STRING)"))
            .withColumn("number_children",  expr("CAST(n AS STRING)"))
            .withColumn("professional_category",  expr("CAST(n AS STRING)"))
            .withColumn("date_of_birth", expr("TO_DATE(date_of_birth, 'dd/MM/yyyy')"))
            .withColumn("age", datediff(current_date(), "date_of_birth"))
            .withColumnRenamed("number_children", "children")
            .withColumnRenamed("state", "state_of_residence")
            .select("user_id","gender","avg_airtime","age","education_status",
                   "employment_status","bank","monthly_income","children", "owns_car",
                   "state_of_residence", "payment_system", "professional_category",
                   "email_is_validated")
    )


@dlt.create_table(
    comment = "The raw applications data cleaned and prepared for analysis"
)
@dlt.expect_or_fail('valid_user','user_id IS NOT NULL')
@dlt.expect_or_fail('principal', 'principal > 0')
@dlt.expect_or_fail('product_id','product_id IS NOT NULL')
@dlt.expect_or_fail('application_datetime','application_datetime IS NOT NULL')

def applications_prepared():
    return (
        dlt.read("applications_raw")
            .filter(expr("loan_status != 'REJECTED'"))
            .withColumn("approval_datetime", expr("TO_DATE(approval_datetime, 'yyyy-MM-dd')"))
            .withColumn("last_payment_date", expr("TO_DATE(last_payment_date, 'yyyy-MM-dd')"))
            .withColumn("loan_duration", datediff("last_payment_date", "approval_datetime"))
            .select('user_id','loan_status','principal','balance','defauted','repaid','loan_reason',
                    'is_fraudulent','interest','late_fee','product_id', 'last_payment_date')
    )

### User applications

In [None]:
@dlt.table(
    comment = "join both tables; users_prepared and applications_prepared"
)

def user_loan_applications():
    return (
        dlt.read("applications_prepared").alias("a")
        .join(dlt.read("users_prepared").alias("u"), "user_id", "left")
        .select(
            'u.user_id', "u.gender", "u.avg_airtime", "u.age", "u.education_status",
            "u.employment_status", "u.bank", "u.monthly_income", "u.children", 
            "u.owns_car", "u.state_of_residence", "u.payment_system", 
            "u.professional_category", "u.email_is_validated",
            'a.loan_status', 'a.principal', 'a.balance', 'a.defaulted',
            'a.repaid', 'a.loan_reason', 'a.is_fraudulent', 'a.interest', 
            'a.late_fee', 'a.product_id', 'a.last_payment_date'
        )
    )