#项目：HR_PROJECT
##这是用来处理汇总层的数据，最终用来把数据开放给终端用户

In [0]:

from delta.tables import *
from pyspark.sql.functions import col, current_timestamp

# 获取已注册的表（无需手动创建 SparkSession，Databricks 已内置）
t1 = spark.table("uc_data001.uc_schema001.fct_hr_employees")  # 员工表
t2 = spark.table("uc_data001.uc_schema001.fct_hr_employees")  # 经理表（自连接）
t3 = spark.table("uc_data001.uc_schema001.dim_hr_employees_departments")  # 部门表
t4 = spark.table("uc_data001.uc_schema001.dim_hr_employees_locations")  # 位置表
t5 = spark.table("uc_data001.uc_schema001.dim_hr_employees_countries")  # 国家表

# 执行多表连接查询
result_df = (
  t1.alias("t1")
  .join(t2.alias("t2"), on=col("t2.EMPLOYEE_ID") == col("t1.MANAGER_ID"), how="left")
  .join(t3.alias("t3"), on=col("t1.DEPARTMENT_ID") == col("t3.DEPARTMENT_ID"), how="left")
  .join(t4.alias("t4"), on=col("t4.LOCATION_ID") == col("t3.LOCATION_ID"), how="left")
  .join(t5.alias("t5"), on=col("t4.COUNTRY_ID") == col("t5.COUNTRY_ID"), how="left")
  .select(
    col("t1.EMPLOYEE_ID"),
    col("t1.FIRST_NAME"),
    col("t1.LAST_NAME"),
    col("t1.EMAIL"),
    col("t1.PHONE_NUMBER"),
    col("t1.HIRE_DATE"),
    col("t1.JOB_ID"),
    col("t1.SALARY"),
    col("t2.FIRST_NAME").alias("MANAGER_FIRST_NAME"),
    col("t2.LAST_NAME").alias("MANAGER_LAST_NAME"),
    col("t3.DEPARTMENT_NAME"),
    col("t4.STREET_ADDRESS"),
    col("t4.POSTAL_CODE"),
    col("t4.CITY"),
    col("t4.STATE_PROVINCE"),
    col("t5.COUNTRY_NAME"),
    current_timestamp().alias("DB_CREATED_DATE"),
    current_timestamp().alias("DB_UPDATED_DATE")
  )
)

dm_hr_employees_wide_df = DeltaTable.forName(spark, "uc_data001.uc_schema001.dm_hr_employees_wide")

dm_hr_employees_wide_df.alias("t") \
  .merge(
    result_df.alias("s"),
    "t.EMPLOYEE_ID = s.EMPLOYEE_ID "
  ) \
  .whenMatchedUpdate(set={
    "EMPLOYEE_ID": "s.EMPLOYEE_ID",
    "FIRST_NAME": "s.FIRST_NAME",
    "LAST_NAME": "s.LAST_NAME",
    "EMAIL": "s.EMAIL",
    "PHONE_NUMBER": "s.PHONE_NUMBER",
    "HIRE_DATE": "s.HIRE_DATE",
    "JOB_ID": "s.JOB_ID",
    "SALARY": "s.SALARY",
    "MANAGER_FIRST_NAME": "s.MANAGER_FIRST_NAME",
    "MANAGER_LAST_NAME": "s.MANAGER_LAST_NAME",
    "DEPARTMENT_NAME": "s.DEPARTMENT_NAME",
    "STREET_ADDRESS": "s.STREET_ADDRESS",
    "POSTAL_CODE": "s.POSTAL_CODE",
    "CITY": "s.CITY",
    "STATE_PROVINCE": "s.STATE_PROVINCE",
    "COUNTRY_NAME": "s.COUNTRY_NAME",
    "DB_UPDATED_DATE": current_timestamp()
  }) \
  .whenNotMatchedInsert(values={
    "EMPLOYEE_ID": "s.EMPLOYEE_ID",
    "FIRST_NAME": "s.FIRST_NAME",
    "LAST_NAME": "s.LAST_NAME",
    "EMAIL": "s.EMAIL",
    "PHONE_NUMBER": "s.PHONE_NUMBER",
    "HIRE_DATE": "s.HIRE_DATE",
    "JOB_ID": "s.JOB_ID",
    "SALARY": "s.SALARY",
    "MANAGER_FIRST_NAME": "s.MANAGER_FIRST_NAME",
    "MANAGER_LAST_NAME": "s.MANAGER_LAST_NAME",
    "DEPARTMENT_NAME": "s.DEPARTMENT_NAME",
    "STREET_ADDRESS": "s.STREET_ADDRESS",
    "POSTAL_CODE": "s.POSTAL_CODE",
    "CITY": "s.CITY",
    "STATE_PROVINCE": "s.STATE_PROVINCE",
    "COUNTRY_NAME": "s.COUNTRY_NAME",
    "DB_CREATED_DATE": "s.DB_CREATED_DATE",
    "DB_UPDATED_DATE": "s.DB_UPDATED_DATE"
  }) \
  .execute()