In [None]:
import os
import logging
from datetime import datetime, date
from typing import Optional, Tuple

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F

from new_air.outbound.commons.cdc_loader import CDCLoader
from new_air.outbound.commons.load_status_delta import (
    PipelineHelperOutbound,
    PipelineLayerOutbound,
)
from new_air.outbound.commons.destination import DestinationType
from new_air.outbound.destinations.s3_helper.location_finder import LocationFinder
from new_air.outbound.destinations.s3 import S3Destination

# create logger instance
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# ensure root logger is configured correctly
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)

for handler in root_logger.handlers:
    handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))


In [None]:
# Read required parameters from Databricks job widgets
try:
    CATALOG_NAME = dbutils.widgets.get("catalog")
    SCHEMA_NAME = dbutils.widgets.get("schema")
    TABLE_NAME = dbutils.widgets.get("table_name")

    S3_BUCKET_KEYWORD = dbutils.widgets.get("s3_bucket_keyword")
    S3_PREFIX = dbutils.widgets.get("s3_prefix")
    S3_FILENAME_GENERATOR = dbutils.widgets.get("s3_filename_generator")
    S3_OUTPUT_FORMAT = dbutils.widgets.get("s3_output_format")

    AUTO_FIND_S3_BY_KEYWORD = dbutils.widgets.get("auto_find_s3_by_keyword")
    S3_BUCKET_FULL_URL = dbutils.widgets.get("s3_bucket_full_url")
except Exception:
    raise ValueError("Missing required job parameters. Ensure the job passes catalog, schema, table_name, s3_bucket_keyword, s3_prefix, s3_filename_generator, s3_output_format, auto_find_s3_by_keyword, s3_bucket_full_url")

# Basic validation
for _key, _val in {
    "catalog": CATALOG_NAME,
    "schema": SCHEMA_NAME,
    "table_name": TABLE_NAME,
    "s3_filename_generator": S3_FILENAME_GENERATOR,
    "s3_output_format": S3_OUTPUT_FORMAT,
}.items():
    if _val is None or str(_val).strip() == "":
        raise ValueError(f"Parameter '{_key}' must be provided and non-empty")

# Conditional validation
_auto_flag = str(AUTO_FIND_S3_BY_KEYWORD or "").strip().lower()
if _auto_flag in ("true", "1", "yes"):
    if S3_BUCKET_KEYWORD is None or str(S3_BUCKET_KEYWORD).strip() == "":
        raise ValueError("'s3_bucket_keyword' must be provided when 'auto_find_s3_by_keyword' is true")
else:
    if S3_BUCKET_FULL_URL is None or str(S3_BUCKET_FULL_URL).strip() == "":
        raise ValueError("'s3_bucket_full_url' must be provided when 'auto_find_s3_by_keyword' is false")



In [None]:
# Initialize Spark and environment context
logging.info("Initializing Spark session and environment context")
spark = SparkSession.builder.getOrCreate()
os.environ["CATALOG_NAME"] = CATALOG_NAME

# Build names
qualified_source_table = f"{CATALOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME}"
source_table_only = qualified_source_table.split(".")[-1]

# Prepare load status helper by layer (schema aligned) - version based
layer = PipelineLayerOutbound[SCHEMA_NAME]
helper = PipelineHelperOutbound(layer=layer)
helper.create_version_based_table()



In [None]:
# Read last processed version and last run time from status table
status_df = helper.get_load_status_data_version_based()

_filtered = (
    status_df.where(F.col("table_name") == F.lit(source_table_only))
    .where(F.col("destination_type") == F.lit(DestinationType.S3.value))
    .select("last_processed_version", "last_run_time")
    .limit(1)
    .collect()
)

last_processed_version_saved = None
last_run_date = None
if _filtered:
    _lpv = _filtered[0]["last_processed_version"]
    last_processed_version_saved = int(_lpv) if _lpv is not None else None
    _lrt = _filtered[0]["last_run_time"]
    if _lrt is not None:
        last_run_date = _lrt.date()

this_run_time = datetime.utcnow()
this_run_date = this_run_time.date()



In [None]:
# Compute version bounds and derive last_run_date if necessary
PROCESSING_REQUIRED = True
min_version = None
max_version = None

if last_processed_version_saved is None:
    history_df = spark.sql(f"DESCRIBE HISTORY {qualified_source_table}")
    if not history_df.head(1):
        logger.info("No history found for source table; nothing to process.")
        PROCESSING_REQUIRED = False
    else:
        agg_row = history_df.agg(
            F.min(F.col("version")).alias("min_ver"),
            F.max(F.col("version")).alias("max_ver"),
        ).collect()[0]
        min_version = int(agg_row["min_ver"]) if agg_row["min_ver"] is not None else None
        max_version = int(agg_row["max_ver"]) if agg_row["max_ver"] is not None else None
        if min_version is None or max_version is None:
            logger.info("No history found for source table; nothing to process.")
            PROCESSING_REQUIRED = False
        if PROCESSING_REQUIRED and last_run_date is None:
            window_df = history_df.where(
                (F.col("version") >= F.lit(int(min_version))) & (F.col("version") <= F.lit(int(max_version)))
            )
            if window_df.head(1):
                ts_row = window_df.agg(
                    F.min(F.col("timestamp")).alias("min_ts"),
                    F.max(F.col("timestamp")).alias("max_ts"),
                ).collect()[0]
                min_ts = ts_row["min_ts"]
                last_run_date = (min_ts or datetime.utcnow()).date()
else:
    history_df = spark.sql(f"DESCRIBE HISTORY {qualified_source_table}")
    if not history_df.head(1):
        logger.info("No history found for source table; nothing to process.")
        PROCESSING_REQUIRED = False
    else:
        agg_row = history_df.agg(F.max(F.col("version")).alias("max_ver")).collect()[0]
        current_latest_version = agg_row["max_ver"]
        if current_latest_version is None or int(current_latest_version) <= int(last_processed_version_saved):
            logger.info("No new versions detected; nothing to process.")
            PROCESSING_REQUIRED = False
        else:
            min_version = int(last_processed_version_saved) + 1
            max_version = int(current_latest_version)

logging.info(f"PROCESSING_REQUIRED={PROCESSING_REQUIRED}, min_version={min_version}, max_version={max_version}, last_run_date={last_run_date}, this_run_date={this_run_date}")



In [None]:
# CDC extraction by version
if not PROCESSING_REQUIRED:
    logging.info("Skipping CDC extraction as no processing is required.")
else:
    cdc_loader = CDCLoader(spark)
    try:
        staging_df = cdc_loader.process_table_changes_by_version(
            table_name=qualified_source_table,
            min_version=int(min_version),
            max_version=int(max_version),
        )
        logging.info("CDC extraction completed.")
    except Exception as e:
        logger.error(f"CDC extraction by version failed: {e}")
        PROCESSING_REQUIRED = False



In [None]:
# Resolve S3 target location
if not PROCESSING_REQUIRED:
    logging.info("Skipping S3 location resolution as no processing is required.")
else:
    try:
        environment = None
        _auto_flag = str(AUTO_FIND_S3_BY_KEYWORD or "").strip().lower()
        if _auto_flag in ("true", "1", "yes"):
            loc_finder = LocationFinder(dbutils_instance=dbutils, spark_instance=spark)
            write_location = loc_finder.get_write_location_for_keyword(S3_BUCKET_KEYWORD)
            s3_bucket = write_location["bucket"]
            environment = write_location.get("environment") or loc_finder.get_environment()
            s3_url = f"s3://{s3_bucket}/{S3_PREFIX}" if S3_PREFIX else f"s3://{s3_bucket}"
        else:
            s3_url = S3_BUCKET_FULL_URL.rstrip("/")
            try:
                loc_finder = LocationFinder(dbutils_instance=dbutils, spark_instance=spark)
                environment = loc_finder.get_environment()
            except Exception:
                environment = None
        logging.info(f"Resolved S3 URL: {s3_url}")
    except Exception as e:
        logger.error(f"Failed to resolve S3 location: {e}")
        PROCESSING_REQUIRED = False



In [None]:
# Write to S3 and update version-based status
if not PROCESSING_REQUIRED:
    logging.info("Skipping write and status update as no processing is required.")
else:
    s3_dest = S3Destination(
        spark=spark,
        s3_options={
            "url": s3_url,
            "format": S3_OUTPUT_FORMAT,
            "filename_generator_type": S3_FILENAME_GENERATOR,
            "filename_generator_params": ({"environment": environment} if environment else {}),
        },
    )

    base_params = {
        "start_date": last_run_date,
        "end_date": this_run_date,
    }

    write_ok = False
    try:
        write_ok = s3_dest.write_data(
            df=staging_df,
            output_filename=source_table_only,
            base_params=base_params
        )
    except Exception as e:
        logger.error(f"Write to S3 via destination failed: {e}")
        write_ok = False

    if write_ok:
        try:
            last_processed_version = s3_dest.compute_last_processed_version(staging_df)
            s3_dest.update_version_based_status(
                helper,
                table_name=source_table_only,
                last_processed_version=last_processed_version,
                last_run_time=this_run_time,
            )
            logger.info("Version-based load status updated successfully.")
        except Exception as e:
            logger.error(f"Failed to update version-based load status: {e}")
    else:
        logger.error("Write to S3 failed; version-based load status not updated.")

