In [0]:
%spark.pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, countDistinct, desc, count ,bround , from_unixtime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import regexp_replace, col

spark = SparkSession.builder.appName("ClickstreamETL").enableHiveSupport().getOrCreate()

In [1]:
%spark.pyspark
financial_df = spark.read.parquet("/staging_zone/financial_loan") 


In [2]:
%spark.pyspark
financial_df.show()

In [3]:
%spark.pyspark
financial_df=financial_df.withColumn("issue_date",from_unixtime(col("issue_date")/1000).cast("date"))
financial_df=financial_df.withColumn("last_credit_pull_date",from_unixtime(col("last_credit_pull_date")/1000).cast("date"))
financial_df=financial_df.withColumn("last_payment_date",from_unixtime(col("last_payment_date")/1000).cast("date"))
financial_df=financial_df.withColumn("next_payment_date",from_unixtime(col("next_payment_date")/1000).cast("date"))

In [4]:
%spark.pyspark

# Complete mapping dictionary (all US states + DC)
state_dict = {
    "AL": "Alabama",
    "AK": "Alaska",
    "AZ": "Arizona",
    "AR": "Arkansas",
    "CA": "California",
    "CO": "Colorado",
    "CT": "Connecticut",
    "DE": "Delaware",
    "FL": "Florida",
    "GA": "Georgia",
    "HI": "Hawaii",
    "ID": "Idaho",
    "IL": "Illinois",
    "IN": "Indiana",
    "IA": "Iowa",
    "KS": "Kansas",
    "KY": "Kentucky",
    "LA": "Louisiana",
    "ME": "Maine",
    "MD": "Maryland",
    "MA": "Massachusetts",
    "MI": "Michigan",
    "MN": "Minnesota",
    "MS": "Mississippi",
    "MO": "Missouri",
    "MT": "Montana",
    "NE": "Nebraska",
    "NV": "Nevada",
    "NH": "New Hampshire",
    "NJ": "New Jersey",
    "NM": "New Mexico",
    "NY": "New York",
    "NC": "North Carolina",
    "ND": "North Dakota",
    "OH": "Ohio",
    "OK": "Oklahoma",
    "OR": "Oregon",
    "PA": "Pennsylvania",
    "RI": "Rhode Island",
    "SC": "South Carolina",
    "SD": "South Dakota",
    "TN": "Tennessee",
    "TX": "Texas",
    "UT": "Utah",
    "VT": "Vermont",
    "VA": "Virginia",
    "WA": "Washington",
    "WV": "West Virginia",
    "WI": "Wisconsin",
    "WY": "Wyoming",
    "DC": "District of Columbia"
}

# Example DataFrame with state codes

# Create a UDF to map codes to full names
code_to_name_udf = udf(lambda code: state_dict.get(code, "Unknown"), StringType())

# Add full state name column
financial_df= financial_df.withColumn("address_state", code_to_name_udf(col("address_state")))

financial_df.show()

In [5]:
%spark.pyspark
from pyspark.sql.functions import col, regexp_replace, trim

financial_df = financial_df.withColumn(
    "emp_length",
    trim(regexp_replace(col("emp_length"), " years?|year", ""))
)

financial_df.show()



In [6]:
%spark.pyspark
financial_df = financial_df.withColumn(
    "term",
    trim(regexp_replace(col("term"), " months?|months", ""))
)

financial_df.show()



In [7]:
%spark.pyspark
financial_df= financial_df.fillna({"emp_title": "Unknown"})

In [8]:
%spark.pyspark
financial_df.show()

In [9]:
%spark.pyspark
financial_df = financial_df.withColumn(
    "emp_length",
    trim(
        regexp_replace(col("emp_length"), "[<>\\+]", "")
    )
)

financial_df.show()

In [10]:
%spark.pyspark
from pyspark.sql.functions import col

financial_df = financial_df.withColumn(
    "emp_length",
    col("emp_length").cast("int")
)

financial_df.printSchema()
financial_df.show()


In [11]:
%spark.pyspark
financial_df.write \
    .mode("overwrite") \
    .format("parquet") \
    .saveAsTable("Financial_essential")

In [12]:
%spark.pyspark
from pyspark.sql.functions import col, monotonically_increasing_id

dim_borrowers = financial_df.select(
    col("member_id").alias("borrowers_id_bk"),   # Business Key
    col("emp_title").alias("employment_title"),
    col("emp_length").alias("employment_length"),
    col("annual_income"),
    col("home_ownership"),
    col("address_state").alias("state_code"),
    col("total_acc").alias("total_account"),
    col("verification_status"),
    col("application_type")
).dropDuplicates(["borrowers_id_bk"]) \
 .withColumn("borrowers_id_sk", monotonically_increasing_id())

# نخلي الـ surrogate key أول عمود
dim_borrowers = dim_borrowers.select(
    "borrowers_id_sk",
    "borrowers_id_bk",
    "employment_title",
    "employment_length",
    "annual_income",
    "home_ownership",
    "state_code",
    "total_account",
    "verification_status",
    "application_type"
)

dim_borrowers.show()


In [13]:
%spark.pyspark
from pyspark.sql.functions import col, monotonically_increasing_id, when

dim_status = financial_df.select(
    col("loan_status").alias("loan_status")
).dropDuplicates(["loan_status"]) \
 .withColumn("status_id_sk", monotonically_increasing_id())

# Add loan_status_category column
dim_status = dim_status.withColumn(
    "loan_status_category",
    when(col("loan_status").isin("Fully Paid", "Current"), "Good")
    .otherwise("Bad")
)

# Final selection
dim_status = dim_status.select(
    "status_id_sk",
    col("loan_status").alias("status_id"),
    "loan_status_category"
)

dim_status.show()



In [14]:
%spark.pyspark
dim_credit_grade = financial_df.select(
    col("grade"),
    col("sub_grade")
).dropDuplicates(["sub_grade"]) \
 .withColumn("credit_grade_sk", monotonically_increasing_id())

dim_credit_grade = dim_credit_grade.select(
    "credit_grade_sk",
    "grade",
    "sub_grade"
)
dim_credit_grade.show()

In [15]:
%spark.pyspark
from pyspark.sql.functions import col, monotonically_increasing_id, concat_ws, lit

# Step 1: نجيب العمود period
dim_loan_term = financial_df.select(
    col("term").alias("period")
).dropDuplicates(["period"]) \
 .withColumn("loan_term_sk", monotonically_increasing_id())

# Step 2: نضيف الوصف term_description
dim_loan_term = dim_loan_term.select(
    "loan_term_sk",
    "period"
).withColumn(
    "term_description",
    concat_ws(" ", col("period"), lit("months"))
)

dim_loan_term.show()


In [16]:
%spark.pyspark
from datetime import datetime, timedelta
from pyspark.sql import functions as F
from pyspark.sql.functions import col, year, month, date_format
from pyspark.sql.types import IntegerType

# Generate all dates for 2021
start_date = datetime(2021, 1, 1)
end_date = datetime(2021, 12, 31)

date_list = [(start_date + timedelta(days=x),) for x in range((end_date - start_date).days + 1)]

df_dates = spark.createDataFrame(date_list, ["Date"])

# Add Date_key in YYYYMMDD format
df_dates = df_dates.withColumn("Date_key", F.date_format(col("Date"), "yyyyMMdd").cast(IntegerType()))

# Extract Year, Month, Month_name, Quarter
df_dates = df_dates.withColumn("Year", year(col("Date"))) \
                   .withColumn("Month", month(col("Date"))) \
                   .withColumn("Month_name", date_format(col("Date"), "MMMM")) \
                   .withColumn("Quarter", F.quarter(col("Date")))

# Reorder columns
df_dates = df_dates.select("Date_key", "Date", "Year", "Month", "Month_name", "Quarter")

df_dates.show(5)


In [17]:
%spark.pyspark
from pyspark.sql.functions import col, to_date, monotonically_increasing_id

# الخطوة الأولى: تحويل أعمدة التاريخ وعمل الربط المبدئي مع جدول التواريخ
# (هذه الجزئية من الكود الأصلي صحيحة، مع تعديل بسيط على الأسماء لتطابق الرسم)
financial_df_with_dates = financial_df \
    .withColumn("issue_date_dt", to_date(col("issue_date"), "yyyy-MM-dd")) \
    .withColumn("last_payment_date_dt", to_date(col("last_payment_date"), "yyyy-MM-dd")) \
    .withColumn("next_payment_date_dt", to_date(col("next_payment_date"), "yyyy-MM-dd")) \
    .withColumn("last_credit_pull_date_dt", to_date(col("last_credit_pull_date"), "yyyy-MM-dd"))

# الربط مع جدول التواريخ للحصول على المفاتيح الخاصة بكل تاريخ
fact_loan_wip = financial_df_with_dates \
    .join(df_dates.alias("d_issue"), col("issue_date_dt") == col("d_issue.Date"), "left") \
    .join(df_dates.alias("d_last_pay"), col("last_payment_date_dt") == col("d_last_pay.Date"), "left") \
    .join(df_dates.alias("d_next_pay"), col("next_payment_date_dt") == col("d_next_pay.Date"), "left") \
    .join(df_dates.alias("d_credit_pull"), col("last_credit_pull_date_dt") == col("d_credit_pull.Date"), "left")

# الخطوة الثانية: الربط مع باقي جداول الأبعاد للحصول على المفاتيح الاصطناعية (Surrogate Keys)
fact_loan_wip = fact_loan_wip \
    .join(dim_borrowers, financial_df_with_dates.member_id == dim_borrowers.borrowers_id_bk, "left") \
    .join(dim_status, financial_df_with_dates.loan_status == dim_status.status_id, "left") \
    .join(dim_credit_grade, financial_df_with_dates.sub_grade == dim_credit_grade.sub_grade, "left") \
    .join(dim_loan_term, financial_df_with_dates.term == dim_loan_term.period, "left")

# الخطوة الثالثة: اختيار الأعمدة النهائية وإنشاء المفتاح الأساسي لجدول الحقائق
fact_loan = fact_loan_wip.select(
    # Business Key
    col("id").alias("loan_id_bk"),
    
    # Foreign Keys from Dimension Tables
    col("borrowers_id_sk").alias("borrowers_id_fk"),
    col("status_id_sk").alias("status_id_fk"),
    col("credit_grade_sk").alias("credit_grade_fk"),
    col("loan_term_sk").alias("loan_term_fk"),
    col("d_issue.Date_key").alias("date_key_issue"),
    col("d_last_pay.Date_key").alias("date_key_last_payment"),
    col("d_next_pay.Date_key").alias("date_key_next_payment"),
    col("d_credit_pull.Date_key").alias("last_credit_pull_date"),
    
    # Measures (المقاييس الرقمية)
    col("loan_amount"),
    col("dti").alias("DTI"),
    col("installment"),
    col("int_rate").alias("interest_rate"),
    col("total_payment"),
    col("purpose").alias("loan_purpose")
).withColumn("loan_id_pk_sk", monotonically_increasing_id())

# إعادة ترتيب الأعمدة لتطابق الرسم ووضع المفتاح الأساسي في البداية
fact_loan = fact_loan.select(
    "loan_id_pk_sk",
    "loan_id_bk",
    "borrowers_id_fk",
    "status_id_fk",
    "credit_grade_fk",
    "loan_term_fk",
    "date_key_issue",
    "date_key_last_payment",
    "date_key_next_payment",
    "last_credit_pull_date",
    "loan_amount",
    "DTI",
    "installment",
    "interest_rate",
    "total_payment",
    "loan_purpose"
)

fact_loan.show()

In [18]:
%spark.pyspark


In [19]:
%spark.pyspark


In [20]:
%spark.pyspark


In [21]:
%spark.pyspark


In [22]:
%spark.pyspark


In [23]:
%spark.pyspark
spark.sql("USE default")
dim_borrowers.write \
    .mode("overwrite") \
    .format("parquet") \
    .saveAsTable("dim_borrowers")

dim_credit_grade.write \
    .mode("overwrite") \
    .format("parquet") \
    .saveAsTable("dim_credit_grade")

dim_status.write \
    .mode("overwrite") \
    .format("parquet") \
    .saveAsTable("dim_status")

dim_loan_term.write \
    .mode("overwrite") \
    .format("parquet") \
    .saveAsTable("dim_loan_term")


In [24]:
%spark.pyspark
df_dates.write \
    .mode("overwrite") \
    .format("parquet") \
    .saveAsTable("dim_date")

In [25]:
%spark.pyspark
# Save fact_loan to Hive
fact_loan.write \
    .format("parquet") \
    .mode("overwrite") \
    .saveAsTable("fact_loan")


In [26]:
%spark.pyspark