In [0]:
# %run /Workspace/Shared/etl_helpers

In [0]:
%restart_python
%pip install boto3
import boto3
import os
from botocore.exceptions import NoCredentialsError
from pyspark.sql import functions as F
import datetime
import sys
sys.path.insert(0, '/Workspace/Shared')
import etl_helpers 
from pyspark.sql.functions import collect_list, concat_ws

runcycleid = etl_helpers.start_run_cycle("factRequestDocumentsDetails")

os.makedirs("/dbfs/foi/dataload", exist_ok=True)  # make sure directory exists

try:
    # Performance Optimizations
    spark.conf.set("spark.sql.shuffle.partitions", "auto")
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

    # Get last successful run time
    df_lastrun = spark.sql(f"""
        SELECT runcyclestartat as createddate 
        FROM dimruncycle 
        WHERE packagename = 'factRequestDocumentsDetails' AND success = 't' 
        ORDER BY runcycleid DESC LIMIT 1
    """)
    
    if df_lastrun.count() > 0:
        maxcreatedate_str = df_lastrun.first().createddate.strftime("%Y-%m-%d %H:%M:%S")
    else:
        maxcreatedate_str = "2025-07-11 00:00:00"
    
    # maxcreatedate_str = "2026-02-01 00:00:00"
    print(f"Incremental Load Start Date: {maxcreatedate_str}")

    # Identify changed requests and Checkpoint
    # localCheckpoint() breaks the lineage, preventing the Shuffle Metadata error
    df_existing = spark.sql(f"""
        SELECT DISTINCT foiministryrequestid, foirequest_id
        FROM foi_mod.foiministryrequests
        WHERE created_at > '{maxcreatedate_str}' OR try_cast(updated_at AS DATE) > '{maxcreatedate_str}'
    """)
    
    df_existing = df_existing.localCheckpoint()
    df_existing.createOrReplaceTempView("temp_requests")
    
    change_count = df_existing.count()
    print(f"Records to process: {change_count}")

    if change_count == 0:
        raise Exception("no changes for today")

    # Set existing records to inactive (Soft Delete)
    spark.sql(f"""
        MERGE INTO default.factRequestDocumentsDetails dd
        USING (SELECT foirequest_id FROM temp_requests) AS temp
        ON temp.foirequest_id = dd.foirequestid
        WHEN MATCHED AND dd.sourceoftruth = 'FOIMOD' THEN
            UPDATE SET dd.activeflag = 'N'
    """)

    # Optimized CTE Query
    query = f"""
    WITH LatestRequests AS (
        SELECT foiministryrequestid, requeststatuslabel, created_at, updated_at, foirequest_id
        FROM (
            SELECT fmr.*, 
                   ROW_NUMBER() OVER (PARTITION BY fmr.foiministryrequestid ORDER BY version DESC) AS rn
            FROM foi_mod.FOIMinistryRequests fmr
            INNER JOIN temp_requests tr ON fmr.foiministryrequestid = tr.foiministryrequestid
        ) WHERE rn = 1
    ),
    PageFlags AS (
        SELECT
            dpf.foiministryrequestid,
            SUM(SIZE(FROM_JSON(pageflag, 'ARRAY<STRUCT<flagid: INT, page: INT, programareaid: ARRAY<INT>, other: ARRAY<STRING>>>'))) AS reviewed_count
        FROM docreviewer.documentpageflags dpf
        INNER JOIN temp_requests tr ON tr.foiministryrequestid = dpf.foiministryrequestid
        INNER JOIN docreviewer.documents d ON dpf.documentid = d.documentid
        INNER JOIN docreviewer.documentmaster dm ON dm.documentmasterid = d.documentmasterid
        WHERE NOT EXISTS (
            SELECT 1 FROM docreviewer.documentdeleted dd 
            WHERE dm.ministryrequestid = dd.ministryrequestid AND dm.filepath >= dd.filepath AND dm.filepath < CONCAT(dd.filepath, 'z')
        )
        GROUP BY dpf.foiministryrequestid
    ),
    Fees AS (
        SELECT 
            ministryrequestid,
            est_elec,
            est_hard
        FROM (
            SELECT 
                ministryrequestid,
                get_json_object(feedata, '$.estimatedelectronicpages') AS est_elec,
                get_json_object(feedata, '$.estimatedhardcopypages') AS est_hard,
                ROW_NUMBER() OVER (PARTITION BY ministryrequestid ORDER BY updated_at DESC, version DESC) AS rn
            FROM foi_mod.foirequestcfrfees
        ) WHERE rn = 1
    ),
    UniqueDocuments AS (
        SELECT dhc.rank1hash, MIN(dhc.documentid) AS canonical_doc_id
        FROM docreviewer.documenthashcodes dhc
        GROUP BY dhc.rank1hash
    ),
    PageStats AS (
        SELECT 
            d.foiministryrequestid,
            SUM(d.pagecount) AS total_pagecount,
            SUM(CASE WHEN ud.canonical_doc_id IS NOT NULL THEN d.pagecount ELSE 0 END) AS dedupe_pagecount
        FROM docreviewer.documents d
        INNER JOIN temp_requests tr ON d.foiministryrequestid = tr.foiministryrequestid
        LEFT JOIN UniqueDocuments ud ON d.documentid = ud.canonical_doc_id
        GROUP BY d.foiministryrequestid
    )
    SELECT
        lr.foiministryrequestid,
        COALESCE(pf.reviewed_count, 0) AS reviewed_count,
        CASE WHEN lr.requeststatuslabel = 'closed' THEN COALESCE(pf.reviewed_count, 0) ELSE 0 END AS pagesreleased,
        lr.created_at,
        lr.updated_at,
        lr.foirequest_id,
        f.est_elec,
        f.est_hard,
        ps.total_pagecount,
        ps.dedupe_pagecount
    FROM LatestRequests lr
    LEFT JOIN PageFlags pf ON lr.foiministryrequestid = pf.foiministryrequestid
    LEFT JOIN Fees f ON lr.foiministryrequestid = f.ministryrequestid
    LEFT JOIN PageStats ps ON lr.foiministryrequestid = ps.foiministryrequestid
    """

    df = spark.sql(query)

    # Map to Target Schema
    df_mapped = df.selectExpr(
        "foirequest_id AS foirequestid",
        f"{runcycleid} as runcycleid",
        # "'0' as runcycleid",
        "reviewed_count AS noofpagesreviewed",
        "pagesreleased AS noofpagesreleased",
        "dedupe_pagecount AS noofpagesdeduplicated",
        "total_pagecount AS noofpagesintherequest",
        "est_elec AS electronicpageestimate",
        "est_hard AS physicalpageestimate",
        "'' AS noofpagesinreviewlog",
        "'' AS noofpagesinredactionlayer",
        "'Y' AS activeflag",
        "'FOIMOD' AS sourceoftruth"
    )

    # # --- Debugging ---
    # print("DEBUG: Showing top 20 results (Write skipped)")
    # display(df_mapped)

    # Write Data
    df_mapped.write.format("delta").mode("append").insertInto("factrequestdocumentsdetails")
    etl_helpers.end_run_cycle(runcycleid, 't', "factRequestDocumentsDetails")
    print("ETL Job Successful")

except Exception as e:
    error_msg = str(e)
    if "no changes for today" in error_msg:
        print("No changes to process.")
        etl_helpers.end_run_cycle(runcycleid, 't', "factRequestDocumentsDetails")
    else:
        print(f"An error occurred: {error_msg}")    
        etl_helpers.end_run_cycle(runcycleid, 'f', "factRequestDocumentsDetails", error_msg)
        raise Exception("Notebook failed") from e