In [None]:
# Configuration and Common Imports

# Set datetime handling configuration for Spark 3.0+
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED")

# Common imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit, when, year, date_format
from pyspark.sql.functions import to_date, expr, substring, trim, regexp_replace
from pyspark.sql.types import StringType, IntegerType, DateType
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Current notebook version
logger.info("Starting Confirm asset data transformation - v1.0")

In [None]:
# Reference Tables (Cached)

# Load and cache frequently used small reference tables
logger.info("Loading and caching reference tables")

# Feature Type reference table
feature_types_df = spark.sql("""
    SELECT feature_type_code, feature_type_name 
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_feature_type
""").cache()

# Suburbs/Area reference table
suburbs_df = spark.sql("""
    SELECT area_code, area_name 
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_area
""").cache()

# Park name reference table (from CAMS_FEAT_ATTRIB_TYPE)
park_names_df = spark.sql("""
    SELECT site_code, plot_number, feat_attrib_notes
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_feat_attrib_type
    WHERE attrib_type_code = 'PKAN'
""").cache()

# Observation type reference table
obs_type_df = spark.sql("""
    SELECT observe_type_key, observe_type_code, observe_type_name, obs_parm_code
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_observe_type
""").cache()

# Observation parameter reference table
obs_param_df = spark.sql("""
    SELECT obs_parm_code, obs_parm_name
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_observe_parameter
""").cache()

# Observation parameter options reference table
obs_param_options_df = spark.sql("""
    SELECT obs_parm_code, obs_parm_opt_code, obs_parm_opt_name
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_observe_parm_opt
""").cache()

# Job type reference table
job_type_df = spark.sql("""
    SELECT job_type_key, job_type_code, job_type_name
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_job_type
""").cache()

# Job status reference table
job_status_df = spark.sql("""
    SELECT status_code, status_name
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_job_status
""").cache()

# Register all reference tables as temp views for SQL operations
feature_types_df.createOrReplaceTempView("feature_types")
suburbs_df.createOrReplaceTempView("suburbs")
park_names_df.createOrReplaceTempView("park_names")
obs_type_df.createOrReplaceTempView("obs_type")
obs_param_df.createOrReplaceTempView("obs_param")
obs_param_options_df.createOrReplaceTempView("obs_param_options")
job_type_df.createOrReplaceTempView("job_type")
job_status_df.createOrReplaceTempView("job_status")

logger.info("Reference tables cached and registered as temp views")

In [None]:
# Core Filtering DataFrames (South Parks and All Parks)

# Create and cache the South Parks filter DataFrame
logger.info("Creating South Parks filtered DataFrame")

south_parks_df = spark.sql("""
    SELECT site_code, plot_number, central_asset_id
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_feature
    WHERE lower(feature_type_code) LIKE 'pk%' 
    AND lower(contract_area_code) != 'pkn'
 """).cache()

# Create and cache the general Park filter DataFrame
logger.info("Creating Park filter DataFrame for all parks")

park_filter_df = spark.sql("""
    SELECT site_code, plot_number, central_asset_id
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_feature
    WHERE lower(feature_type_code) LIKE 'pk%'
""").cache()

# Register as temp views for SQL operations
south_parks_df.createOrReplaceTempView("south_parks")
park_filter_df.createOrReplaceTempView("park_filter")

logger.info("Core filtering DataFrames created and cached")

In [None]:
# Parks Table Transformation

logger.info("Creating Parks table")

parks_df = spark.sql("""
    SELECT 
        f.site_code AS `Site code`,
        f.plot_number AS `Plot No.`,
        f.feature_deadflag AS `Dead flag`,
        f.contract_area_code AS `Contract area code`,
        f.feature_type_code AS `Park type code`,
        f.central_asset_id AS `Central asset ID`,
        f.feature_start_date AS `Park added`,
        CASE 
            WHEN lower(f.contract_area_code) = 'pkn' THEN 'North'
            ELSE 'South' 
        END AS `Service area`,
        ft.feature_type_name AS `Park type`,
        pn.feat_attrib_notes AS `Park name`,
        s.area_name AS `Suburb`,
        concat(f.site_code, '-', cast(f.plot_number as string)) AS `Site and plot`,
        year(f.feature_start_date) AS `Year added`
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_feature f
    LEFT JOIN feature_types ft ON f.feature_type_code = ft.feature_type_code
    LEFT JOIN park_names pn ON f.site_code = pn.site_code AND f.plot_number = pn.plot_number
    LEFT JOIN suburbs s ON f.area_code = s.area_code
    WHERE lower(f.feature_type_code) LIKE 'pk%'
""")

# Display preview for debugging
display(parks_df.limit(10))

In [None]:
# Batch and Route Information

logger.info("Creating Route and Batch reference DataFrames")

# Route information
route_df = spark.sql("""
    SELECT insp_route_code, insp_route_name, insp_type_code
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_inspection_route
    WHERE insp_type_code = 'PKPA'
""").cache()

# Register the DataFrame as a temp view
route_df.createOrReplaceTempView("route")

# Batch information with route joined - now use the registered view name
batch_df = spark.sql("""
    SELECT 
        b.insp_batch_no, 
        b.insp_route_code, 
        b.insp_create_date,
        r.insp_route_name
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_inspection_batch b
    JOIN route r ON b.insp_route_code = r.insp_route_code
""").cache()

# Register batch as a temp view too
batch_df.createOrReplaceTempView("batch")

logger.info("Route and Batch reference DataFrames created")

In [None]:
# Inspection Table Transformation

logger.info("Creating Inspection table")

inspection_df = spark.sql("""
    SELECT 
        i.insp_batch_no AS `Batch no.`,
        i.site_code AS `Site code`,
        i.plot_number AS `Plot no.`,
        i.feature_insp_date AS `Inspection datetime`,
        b.insp_route_code AS `Route code`,
        b.insp_route_name AS `Route name`,
        to_date(i.feature_insp_date) AS `Inspection date`,
        concat(i.site_code, '-', cast(i.plot_number as string)) AS `Site and Plot`,
        concat(cast(i.insp_batch_no as string), '-', i.site_code, '-', cast(i.plot_number as string)) AS `Batch site plot`
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_inspection_feature i
    JOIN batch b ON i.insp_batch_no = b.insp_batch_no
    JOIN south_parks sp ON i.site_code = sp.site_code AND i.plot_number = sp.plot_number
""")

# Display preview for debugging
display(inspection_df.limit(10))

In [None]:
# Observations Table Transformation

logger.info("Creating Observations table")

# Join obs_type with obs_param first
obs_type_with_param_df = spark.sql("""
    SELECT ot.*, op.obs_parm_name
    FROM obs_type ot
    JOIN obs_param op ON ot.obs_parm_code = op.obs_parm_code
""").cache()
obs_type_with_param_df.createOrReplaceTempView("obs_type_with_param")

observations_df = spark.sql("""
    SELECT 
        ic.insp_batch_no AS `Batch No.`,
        ic.site_code AS `Site code`,
        ic.plot_number AS `Plot No.`,
        ic.grade_code AS `Grade code`,
        ic.condition_notes AS `Condition notes`,
        ic.survey_obs_value AS `Obs value`,
        b.insp_create_date AS `Inspection datetime`,
        ot.observe_type_code AS `Obs type code`,
        ot.observe_type_name AS `Obs type`,
        opo.obs_parm_opt_name AS `Grade`,
        CASE 
            WHEN ot.observe_type_name LIKE 'Cleans%' THEN 'Cleansing'
            WHEN ot.observe_type_name LIKE 'Gene%' THEN 'General'
            WHEN ot.observe_type_name LIKE 'Horti%' THEN 'Horticulture'
            WHEN ot.observe_type_name LIKE 'Infrast%' THEN 'Infrastructure'
            WHEN ot.observe_type_name LIKE 'Tur%' THEN 'Turf'
            ELSE NULL
        END AS `Observation group`,
        concat(cast(ic.insp_batch_no as string), '-', ic.site_code, '-', cast(ic.plot_number as string)) AS `Batch site plot`
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_insp_condition ic
    JOIN batch b ON ic.insp_batch_no = b.insp_batch_no
    JOIN south_parks sp ON ic.site_code = sp.site_code AND ic.plot_number = sp.plot_number
    JOIN obs_type ot ON ic.observe_type_key = ot.observe_type_key
    LEFT JOIN obs_param_options opo ON ot.obs_parm_code = opo.obs_parm_code AND ic.grade_code = opo.obs_parm_opt_code
""")

# Display preview for debugging
display(observations_df.limit(10))

In [None]:
# Job Table Transformation

logger.info("Creating Jobs table")

# First create job status log reference dataframes
job_status_log_current_df = spark.sql("""
    SELECT job_number, job_log_number, allocated_officer, status_code
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_job_status_log
""").cache()
job_status_log_current_df.createOrReplaceTempView("job_status_log")

job_status_log_when_logged_df = spark.sql("""
    SELECT job_number, login_name AS `Logged by`
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_job_status_log
    WHERE job_log_number = 1
""").cache()
job_status_log_when_logged_df.createOrReplaceTempView("job_status_log_when_logged")

# Contract and contractor
contractor_df = spark.sql("""
    SELECT contractor_code, contractor_name
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_contractor
""").cache()
contractor_df.createOrReplaceTempView("contractor")

contract_df = spark.sql("""
    SELECT c.contract_code, c.contract_name, c.contractor_code, co.contractor_name
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_contract c
    JOIN contractor co ON c.contractor_code = co.contractor_code
""").cache()
contract_df.createOrReplaceTempView("contract")

# Action officer
action_officer_df = spark.sql("""
    SELECT officer_code, officer_name
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_action_officer
""").cache()
action_officer_df.createOrReplaceTempView("action_officer")

# Now create the jobs table
jobs_df = spark.sql("""
    SELECT 
        j.job_number AS `Job number`,
        j.site_code AS `Site code`,
        j.plot_number AS `Plot number`,
        j.job_entry_date AS `Job entry datetime`,
        j.job_notes AS `Job note`,
        j.job_location AS `Job location`,
        j.job_status_flag AS `Job status flag`,
        j.actual_comp_date AS `Job actual completion datetime`,
        j.actual_start_date AS `Job start datetime`,
        j.target_comp_date AS `Job target completion datetime`,
        j.parent_job_number AS `Parent job number`,
        jt.job_type_name AS `Job type name`,
        jsl.allocated_officer AS `Allocated officer code`,
        jsl.status_code AS `Job status code`,
        js.status_name AS `Job status`,
        c.contract_name AS `Contract name`,
        c.contractor_name AS `Contractor name`,
        ao.officer_name AS `Allocated officer name`,
        jsl_log.`Logged by`,
        to_date(j.actual_comp_date) AS `Job complete date`,
        to_date(j.job_entry_date) AS `Job created date`,
        CASE
            WHEN j.actual_comp_date IS NULL THEN j.job_entry_date
            ELSE j.actual_comp_date
        END AS `last updated`,
        concat(j.site_code, '-', cast(j.plot_number as string)) AS `Site and plot`
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_job j
    JOIN park_filter pf ON j.site_code = pf.site_code AND j.plot_number = pf.plot_number
    LEFT JOIN job_type jt ON j.job_type_key = jt.job_type_key
    LEFT JOIN job_status_log jsl ON j.job_number = jsl.job_number AND j.job_log_number = jsl.job_log_number
    LEFT JOIN job_status js ON jsl.status_code = js.status_code
    LEFT JOIN contract c ON j.contract_code = c.contract_code
    LEFT JOIN action_officer ao ON jsl.allocated_officer = ao.officer_code
    LEFT JOIN job_status_log_when_logged jsl_log ON j.job_number = jsl_log.job_number
""")

# Display preview for debugging
display(jobs_df.limit(10))

In [None]:
# All Measurements Table Transformation

logger.info("Creating All Measurements table")

# Since EFF_ST_DT is no longer available, we'll simply get all measurements
# If you need to identify the "latest" measurement in the future, you'll need to
# determine which column in your new schema indicates recency/version

all_measurements_df = spark.sql("""
    SELECT 
        m.site_code, 
        m.plot_number, 
        m.measurement_code AS `Measurement code`, 
        m.feature_quantity AS `Value`,
        concat(m.site_code, '-', cast(m.plot_number as string)) AS `Site and plot`
    FROM DIMSU_Lakehouse_Bronze.assets.confirm_feat_measurement m
    JOIN park_filter pf ON m.site_code = pf.site_code AND m.plot_number = pf.plot_number
    WHERE lower(m.measurement_code) LIKE 'pk%'
""")

# Display preview for debugging
display(all_measurements_df.limit(10))

# Note: If you later identify a column that can be used to determine the "latest" measurement,
# you would use window functions like this:
"""
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

# Example if you find a different date column:
# window_spec = Window.partitionBy("site_code", "plot_number", "Measurement code").orderBy(desc("some_date_column"))
# latest_measurements_df = all_measurements_df.withColumn("row_num", row_number().over(window_spec)) \
#                                        .filter(col("row_num") == 1) \
#                                        .drop("row_num")
"""

In [None]:
# Clean Up Cached DataFrames

logger.info("Cleaning up cached DataFrames")

# Unpersist reference tables
feature_types_df.unpersist()
suburbs_df.unpersist()
park_names_df.unpersist()
obs_type_df.unpersist()
obs_param_df.unpersist()
obs_param_options_df.unpersist()
job_type_df.unpersist()
job_status_df.unpersist()

# Unpersist filtering DataFrames
south_parks_df.unpersist()
park_filter_df.unpersist()

# Unpersist other intermediate DataFrames
route_df.unpersist()
batch_df.unpersist()
obs_type_with_param_df.unpersist()
job_status_log_current_df.unpersist()
job_status_log_when_logged_df.unpersist()
contractor_df.unpersist()
contract_df.unpersist()
action_officer_df.unpersist()

logger.info("All cached DataFrames unpersisted")