# Annualized RechargeCost
- Loads both source tables (`FFR_RechargeCost` and `FFR_Fact_Recharge_CCH_x_PYR`)
- Cleans and deduplicates the fact/mapping table
- Performs a LEFT JOIN:
  - r.OHR_PeriodEndDate_x → f.CCH_PeriodEndDate_x  
  - r.PYR_EeRef → f.CCH_OpenHRNo (trimmed)
- Overrides OHR_Grade_HR to "Partner" when OHR_Partner_x = "Yes"
- Creates updated_df containing enriched and corrected records
- Aggregates the updated dataset by OHR_Grade_HR and OHR_Team_Code:
  - SUM(PYR_RechargeCost_x)
  - COUNT DISTINCT(PYR_EeRef)
- Produces annualized_df
- Saves results to both Silver and Gold Delta paths with overwriteSchema = true

In [None]:
%%configure
{
    "defaultLakehouse": { 
        "name": "V6_SE_Silver"
    }
}

In [None]:
%run SharedFunctions

In [None]:
import pyspark.sql.functions as F

# Set Lakehouse Paths - From SharedFunctions

se_pewter_path = get_se_pewter_path()
se_silver_path = get_se_silver_path()
se_gold_path = get_se_gold_path()
pewter_path = get_pewter_path()
silver_path = get_silver_path()
gold_path = get_gold_path()


# Setup logger - From SharedFunctions

logger = setup_logger('SE_Firm_Financials_Recharge')

In [None]:
# Determine the Source and Destination 

source_table  = "FFR_RechargeCost"
source_table1 = "FFR_Fact_Recharge_CCH_x_PYR"

dest_table  = "FFR_Annualized_RechargeCost"

source_path  = f"{se_silver_path}/{source_table}"
source_path1 = f"{se_silver_path}/{source_table1}"
dest_path    = f"{se_silver_path}/{dest_table}"
dest_path1   = f"{se_gold_path}/{dest_table}"

# Load FFR_RechargeCost (main dataset)
recharge_df = (
    spark.read.format("delta").load(source_path)
        .select(
            "PYR_EeID",
            "PYR_RechargeCost_x",
            "OHR_Team_Code",
            "OHR_Job_Title",
            "OHR_Grade_HR",
            "PYR_EeRef",
            "OHR_PeriodEndDate_x"
        )
)

fact_df = (
    spark.read.format("delta").load(source_path1)
        .select(
            "CCH_PeriodEndDate_x",
            "CCH_OpenHRNo",
            "OHR_Partner_x",
            "SHP_Ee_Department_RollUp_x"
        )
        # Keep only rows where CCH_OpenHRNo and SHP_Ee_Department_RollUp_x is not null and not empty after trimming
        .where(
            col("CCH_OpenHRNo").isNotNull() &
            (trim(col("CCH_OpenHRNo")) != "") &
            col("SHP_Ee_Department_RollUp_x").isNotNull() &
            (trim(col("SHP_Ee_Department_RollUp_x")) != "")
        )
        .dropDuplicates(["CCH_PeriodEndDate_x", "CCH_OpenHRNo", "OHR_Partner_x", "SHP_Ee_Department_RollUp_x"])
)


from pyspark.sql.functions import col, trim

# Left join: recharge_df as left, fact_df as right
joined_df = (
    recharge_df.alias("r")
    .join(
        fact_df.alias("f"),
        on=[
            col("r.OHR_PeriodEndDate_x") == col("f.CCH_PeriodEndDate_x"),
            trim(col("r.PYR_EeRef")) == trim(col("f.CCH_OpenHRNo"))
        ],
        how="left"  # or "left_outer"
    )
)


# Update OHR_Grade_HR: set to "Partner" when OHR_Partner_x == "Yes" (case-insensitive), else keep original
updated_df = (
    joined_df.withColumn(
        "OHR_Grade_HR",
        when(lower(trim(col("f.OHR_Partner_x"))) == "yes", "Partner")
        .otherwise(col("r.OHR_Grade_HR"))
    )
)


# Aggregate: Total Annualized RechargeCost + Distinct Count of PYR_EeRef
annualized_df = (
    updated_df
    .groupBy(
        "OHR_Grade_HR",
        "OHR_Team_Code"
    )
    .agg(
        F.sum("PYR_RechargeCost_x").alias("PYR_Annualized_RechargeCost_x"),
        F.countDistinct("PYR_EeRef").alias("PYR_EmployeeID_Count_x")
    )
    .select(
        "OHR_Grade_HR",
        "OHR_Team_Code",
        "PYR_Annualized_RechargeCost_x",
        "PYR_EmployeeID_Count_x"
    )
)


# Write to Delta with Schema overwrite
annualized_df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save(dest_path)
annualized_df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save(dest_path1)


print("Annualized recharge cost table created successfully.")
