In [3]:
import os
from pyspark.sql import Window
import pyspark.sql.functions as F

def process_features_gold_table(snapshot_date_str, 
                                silver_loan_daily_directory,  
                                silver_clickstream_directory,
                                silver_attributes_directory, 
                                silver_financials_directory,
                                gold_features_store_directory,
                                spark):
    # 拼接文件路径，日期中的 - 替换成 _
    loan_path = silver_loan_daily_directory + f"silver_loan_daily_{snapshot_date_str.replace('-', '_')}.parquet"
    click_path = silver_clickstream_directory + f"silver_clickstream_{snapshot_date_str.replace('-', '_')}.parquet"
    attr_path = silver_attributes_directory + f"silver_attributes_{snapshot_date_str.replace('-', '_')}.parquet"
    financial_path = silver_financials_directory + f"silver_financials_{snapshot_date_str.replace('-', '_')}.parquet"
    
    # 读取 parquet 文件为DataFrame
    loan_df = spark.read.parquet(loan_path)
    attr_df = spark.read.parquet(attr_path)
    click_df = spark.read.parquet(click_path)
    financial_df = spark.read.parquet(financial_path)
    
    # 按 Customer_ID 和 snapshot_date 左连接所有表
    df = loan_df.join(attr_df, ["Customer_ID", "snapshot_date"], "left")
    df = df.join(click_df, ["Customer_ID", "snapshot_date"], "left")
    df = df.join(financial_df, ["Customer_ID", "snapshot_date"], "left")

    # 对每个Customer_ID和snapshot_date分区，按loan_id倒序排序，取第一条（最新的loan_id）
    window_spec = Window.partitionBy("Customer_ID", "snapshot_date").orderBy(F.desc("loan_id"))
    df = df.withColumn("rank", F.row_number().over(window_spec)) \
           .filter(F.col("rank") == 1) \
           .drop("rank")

    # 写gold表到指定目录，文件名带日期
    gold_path = gold_features_store_directory + f"gold_table_{snapshot_date_str.replace('-', '_')}.parquet"
    df.write.mode("overwrite").parquet(gold_path)
    print(f"gold table saved to {gold_path}")

    return df


# 下面是一个示例调用（你要保证spark session已创建）

if __name__ == "__main__":
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.appName("FeatureProcessing").getOrCreate()

    # 目录路径示例
    gold_feature_store_directory = "datamart/gold/feature_store/"
    silver_loan_daily_directory = "datamart/silver/loan_daily/"
    silver_loan_directory_clickstream = "datamart/silver/loan_clickstream/"
    silver_loan_directory_attributes = "datamart/silver/loan_attributes/"
    silver_loan_directory_financials = "datamart/silver/loan_financials/"

    # 确保目录存在
    os.makedirs(gold_feature_store_directory, exist_ok=True)

    # 日期列表举例
    dates_str_lst = ["2024-07-01"]

    for date_str in dates_str_lst:
        df = process_features_gold_table(date_str, 
                                         silver_loan_daily_directory,  
                                         silver_loan_directory_clickstream,
                                         silver_loan_directory_attributes, 
                                         silver_loan_directory_financials,
                                         gold_feature_store_directory,
                                         spark)
        df.show(5)


gold table saved to datamart/gold/feature_store/gold_table_2024_07_01.parquet
+-----------+-------------+--------------------+---------------+------+---------------+--------+-------+--------+-----------+-------+---+-------------------+-----------------+---+----+----+----+----------+----+----+----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------------+---------+-------------+---------------------+-----------------+---------------+-------------+-----------+------------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+---------------------+-------------------+-----------------------+-----------------+---------------+
|Customer_ID|snapshot_date|             loan_id|loan_start_date|tenure|installment_num|loan_amt|due_amt|paid_amt|overdue_amt|balance|mob|installments_missed|first_missed_date|dpd|Name| Age| SSN|Occupation|fe_1|fe_2|f

In [4]:
print(df.columns)
df.select("fe_1", "fe_2", "loan_amt").show(5)


['Customer_ID', 'snapshot_date', 'loan_id', 'loan_start_date', 'tenure', 'installment_num', 'loan_amt', 'due_amt', 'paid_amt', 'overdue_amt', 'balance', 'mob', 'installments_missed', 'first_missed_date', 'dpd', 'Name', 'Age', 'SSN', 'Occupation', 'fe_1', 'fe_2', 'fe_3', 'fe_4', 'fe_5', 'fe_6', 'fe_7', 'fe_8', 'fe_9', 'fe_10', 'fe_11', 'fe_12', 'fe_13', 'fe_14', 'fe_15', 'fe_16', 'fe_17', 'fe_18', 'fe_19', 'fe_20', 'total_clicks', 'is_active', 'Annual_Income', 'Monthly_Inhand_Salary', 'Num_Bank_Accounts', 'Num_Credit_Card', 'Interest_Rate', 'Num_of_Loan', 'Type_of_Loan', 'Delay_from_due_date', 'Num_of_Delayed_Payment', 'Changed_Credit_Limit', 'Num_Credit_Inquiries', 'Credit_Mix', 'Outstanding_Debt', 'Credit_Utilization_Ratio', 'Credit_History_Age', 'Payment_of_Min_Amount', 'Total_EMI_per_month', 'Amount_invested_monthly', 'Payment_Behaviour', 'Monthly_Balance']
+----+----+--------+
|fe_1|fe_2|loan_amt|
+----+----+--------+
| 111|   0| 10000.0|
| 251| 156| 10000.0|
| 199| 209| 10000.0|
|