### Create streaming bronze table

In [None]:
# Load only new applications data everytime pipeline loads

CREATE STREAMING LIVE TABLE LoanApplications_BronzeLiveIncremental

# optionally define the schema


USING DELTA 

COMMENT "Live Bronze table for LoanApplications"

AS

SELECT  user_id::Int, loan_status::String, principal::Double,balance::Double,
        defaulted::Boolean, repaid::Double, loan_reason::String,application_date::Date,
        approval_date::Date, repayment_date::Date, last_payment_date::Date,
        rejection_date::Date, is_fradulent::Boolean, next_payment_date::Date,
        interest::Double, late_fee::Double, product::String,
                                
        INPUT_FILE_NAME() AS FileName
        CURRENT_TIMESTAMP() AS CreatedOn

-- AutoLoader: incrementally picks up new data
FROM cloud_files( 
    
                    "/mtn/datalake/Raw/LoanApplications/",
                    "csv",
                    map("inferSchema","true")
                )

### Create streaming bronze view

In [None]:
# Create Bronze view from the bronze table
CREATE STREAMING LIVE VIEW LoanApplications_BronzeLiveIncrementalView
(    -- Define the constraints
    CONSTRAINT Valid_principal EXPECT (principal IS NOT NULL AND principal > 0) ON VIOLATION DROP ROW,
    CONSTRAINT Valid_user_id   EXPECT (user_id IS NOT NULL AND user_id > 0) ON VIOLATION FAIL UPDATE,
    CONSTRAINT Accepted_applications EXPECT (loan_status != 'REJECTED') ON VIOLATION DROP ROW,
    CONSTRAINT Valid_product EXPECT (product IS NOT NULL) ON VIOLATION DROP ROW,

)

AS

SELECT  user_id, loan_status, principal, balance, defaulted, repaid, loan_reason, 
        approval_date, last_payment_date, interest, late_fee, 
        product, CreatedOn, 
        YEAR(approval_date) AS approval_year, 
        MONTH(approval_date) AS approval_month, DAYOFMONTH(approval_date) AS approval_day,
        DATEDIFF(late_payment_date, approval_date) AS loan_duration_in_days,

FROM STREAM(live.LoanApplications_BronzeLiveIncremental)

### Create streaming silver table

In [None]:
# Create Silver table
CREATE STREAMING LIVE TABLE LoanApplications_SilverLiveIncremental
(
        user_id                INT         COMMENT 'This is the primary key column',
        loan_status            STRING,
        principal              DOUBLE,
        balance                DOUBLE, 
        defaulted              STRING,
        repaid                 DOUBLE,
        loan_reason            STRING,
        interest               DOUBLE,
        late_fee               DOUBLE,
        product                STRING,
        CreatedOn              TIMESTAMP,
        approval_year          INT,
        approval_month         INT,
        approval_day           INT,
        loan_duration_in_days  INT
)

USING DELTA

### Merge changes from Bronze to Silver tables incrementally

In [None]:
# apply changes into table
APPLY CHANGES INTO live.LoanApplications_SilverLiveIncremental

FROM STREAM(LoanApplications_BronzeLiveIncrementalView)

KEYS (user_id)


# APPLY AS DELETE WHEN OPERATION = 'DELETE'

SEQUENCE BY CreatedOn

### Create complete gold table - User-wise summary

In [None]:
CREATE LIVE TABLE LoanApplications_SummaryByUser_GoldLive2

AS

SELECT  u.user_id, u.first_name, u.last_name, age, u.state, u.education_status, 
        u.employment_status, u.bank, u.number_of_children, u.owns_car, u.state_of_origin,
        u.professional_category, u.monthly_income,
        CONCAT_WS(', ', COLLECT_LIST(la.product)) AS loan_products,
        COUNT(la.user_id) AS total_applications,
        SUM(la.principal) AS total_loan_amount,
        SUM(la.balance) AS outstanding_balance,    
        SUM(la.defaulted) AS total_amount_defaulted,
        SUM(la.repaid) AS total_repaid,
        CONCAT_WS(', ', COLLECT_LIST(la.loan_status)) AS loan_history,
        AVG(la.loan_duration_in_days) average_loan_duration
        
FROM live.LoanApplications_SilverLiveIncremental la

JOIN live.ExistingUsers_BronzeLive u ON la.user_id = u.user_id

GROUP BY    u.user_id, u.first_name, u.last_name, age, u.state, u.education_status,
            u.employment_status, u.bank, u.number_of_children, u.owns_car, u.state_of_origin,
            u.professional_category, u.monthly_income

SUM(la.balance) AS outstanding_balance DESC