# Lakeflow Declarative Pipeline

This Lakeflow Declarative Pipeline definition is executed using a pipeline defined in resources/education_benchmarking_and_insights.pipeline.yml.

In [0]:
import sys

sys.path.append(spark.conf.get("bundle.sourcePath", "."))
from pyspark.sql.functions import col, expr
# New import for dlt.read_sql
from dlt import read_sql

# Define catalog paths and current year
BRONZE_BFR_CATALOG_PATH = "catalog_30_bronze.bfr"
COPPER_FBIT_CATALOG_PATH = "catalog_40_copper_financial_benchmarking_insights.financial_benchmarking_insights"
BRONZE_AAR_CATALOG_PATH = "catalog_30_bronze.aar"

# You would typically parameterize this in a DLT pipeline definition
CURRENT_YEAR = 2024 # Example year, this should be a DLT pipeline parameter

@dlt.view
def bfr_sofa_current_year():
    """
    Loads the current year BFR Sofa materialized view.
    """
    return dlt.read(f"{COPPER_FBIT_CATALOG_PATH}.mv_BFR_Sofa_{CURRENT_YEAR}")

@dlt.view
def bfr_three_year_forecast_current_year():
    """
    Loads the current year BFR Three Year Forecast materialized view.
    """
    return dlt.read(f"{COPPER_FBIT_CATALOG_PATH}.mv_BFR_Three_Year_Forecast_{CURRENT_YEAR}")

@dlt.view
def academies_current_year():
    """
    Loads the current year academies data.
    """
    return dlt.read(f"{BRONZE_AAR_CATALOG_PATH}.vw_academies_{CURRENT_YEAR}")

@dlt.view
def bfr_sofa_historical(year_offset: int):
    """
    Loads historical BFR Sofa data for a given year offset.
    """
    historical_year = CURRENT_YEAR - year_offset
    return dlt.read(f"{BRONZE_BFR_CATALOG_PATH}.vw_BFR_Sofa_{historical_year}") \
                .select("TrustUPIN", "EFALineNo", "Y1P2", "Y2P2") \
                .withColumnRenamed("TrustUPIN", "Trust UPIN")

@dlt.view
def academies_historical(year_offset: int):
    """
    Loads historical academies data for a given year offset.
    """
    historical_year = CURRENT_YEAR - year_offset
    return dlt.read(f"{BRONZE_AAR_CATALOG_PATH}.vw_academies_{historical_year}")

# Example of how you would use the historical views in other DLT tables
@dlt.table
def bfr_sofa_y1_processed():
    desired_efa_lines = [
        # Assuming these are defined elsewhere or passed as config
        # config.SOFA_TRUST_REVENUE_RESERVE_EFALINE,
        # config.SOFA_PUPIL_NUMBER_EFALINE,
        # For demonstration, using dummy values
        "100",
        "200",
    ]
    return dlt.read("bfr_sofa_historical", year_offset=1) \
                .filter(col("EFALineNo").isin(desired_efa_lines))

@dlt.table
def bfr_sofa_y2_processed():
    desired_efa_lines = [
        # Assuming these are defined elsewhere or passed as config
        # config.SOFA_TRUST_REVENUE_RESERVE_EFALINE,
        # config.SOFA_PUPIL_NUMBER_EFALINE,
        # For demonstration, using dummy values
        "100",
        "200",
    ]
    return dlt.read("bfr_sofa_historical", year_offset=2) \
                .filter(col("EFALineNo").isin(desired_efa_lines))

@dlt.table
def academies_y1_processed():
    return dlt.read("academies_historical", year_offset=1)

@dlt.table
def academies_y2_processed():
    return dlt.read("academies_historical", year_offset=2)