In [1]:
import os
import sys
import time
from datetime import datetime

from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import col



# Current working directory
current_dir = os.getcwd()
# Go up 3 levels and append 'src'
project_root = os.path.abspath(os.path.join(current_dir, '..', '..', '..', 'src'))
# Add src to sys.path
sys.path.append(project_root)

from logging_utils import log_task_status, TaskLogger
from unikargo_utils import add_pipeline_metadata
from config import get_log_config

from io_utils import write_log

Loaded tables config from: C:\Users\Dele\Documents\D. Professional Registration\IT\DATA-EnGR\00_data_engr_projects\unicargo\unicargo_dab\tables.yaml


In [2]:
# Create widgets (required for ADF → Databricks integration)
dbutils.widgets.text("pipeline_id", "")
dbutils.widgets.text("run_id", "")
dbutils.widgets.text("task_id", "")
dbutils.widgets.text("processed_timestamp", "")
dbutils.widgets.text("catalog", "unikargo_dev") # Requires an ADF variable for ADF runs

# # Extract values from widgets
pipeline_id = dbutils.widgets.get("pipeline_id")
run_id = dbutils.widgets.get("run_id")
task_id = dbutils.widgets.get("task_id")
processed_timestamp = dbutils.widgets.get("processed_timestamp")
catalog = dbutils.widgets.get("catalog")

# Logging parameters for run context
pipeline_name = "airlines_ingestion"
environment = "dev"
# run_id = str(uuid.uuid4())
start_time = datetime.now()
# status = "SUCCESS"
# message = ""
rows_processed = 0


# target_path=target_path

log_type =  'task'
environment = 'dev'

source_path="abfss://medallion@adlsunikarrgodev.dfs.core.windows.net/raw/volumes/airlines.csv"
target_path = get_log_config(log_type, environment=environment)

# Path to save logging for tasks
# LOG_PATH_TASK = "abfss://medallion@adlsunikarrgodev.dfs.core.windows.net/logs/task_logs"

To use databricks widgets interactively in your notebook, please install databricks sdk using:
	pip install 'databricks-sdk[notebook]'
Falling back to default_value_only implementation for databricks widgets.


In [3]:
# airline schema
airlines_schema = StructType([
    StructField("iata_code", StringType(), True),
    StructField("airline", StringType(), True)
])

In [9]:
# --- Task 1: Read Airlines Data ---
with TaskLogger(
    "read_airlines_csv",
    pipeline_name=pipeline_name,
    source_path=source_path,
    target_path=target_path,
    log_running=False  # keep this False unless you explicitly want a "RUNNING" entry
) as logger:
    
    airlines_df = (
        spark.read
        .schema(airlines_schema)    
        .option("header", "true")
        .csv(source_path)
    )
    
    rows_processed = airlines_df.count()
    
    # Update metrics before completion
    logger.set_metrics(rows=rows_processed)

In [7]:
# --- Task 1: Read
try:
    # -----------------------------
    # 1️Read the airlines CSV
    # -----------------------------
    start_time = time.time()


    airlines_df = (
        spark.read
        .schema(airlines_schema)    
        .option("header", "true")   # use the header row for column names only
        .csv(source_path) # added for adf
        )
    
    rows_processed = airlines_df.count()
    
    # -----------------------------
    # Log SUCCESS to task logs
    # -----------------------------
    log_task_status(
        status="SUCCESS",
        operation="read_airlines_csv",
        rows=rows_processed,
        start_time=start_time,
        pipeline_name=pipeline_name,
        source_path=source_path,
        target_path=target_path,
        pipeline_id=None
    )
    
except Exception as e:
    # -----------------------------
    # Log FAILURE to task logs
    # -----------------------------
    try:
        log_task_status(
            status="FAILED",
            operation="read_airlines_csv",  # Same operation name
            error=e,
            start_time=start_time,
            pipeline_name=pipeline_name,
            source_path=source_path,
            pipeline_id=None
        )
        
    except Exception as log_error:
        print(f"Failed to write task log: {log_error}")
    
    # Re-raise original error for debugging
    raise

In [None]:
# --- Task 2: Add metadata to the dataframe (Pipeline identifier, Run identifier and Task identifier)
try:
    airlines_df = add_pipeline_metadata(airlines_df, pipeline_id, run_id, task_id)
    
    # Count rows after transformation
    rows_processed = airlines_df.count()

    # -----------------------------
    # Log SUCCESS to task logs
    # -----------------------------
    log_task_status(
        status="SUCCESS",
        rows=rows_processed,
        message="Metadata column added successfully",
        pipeline_name=pipeline_name,
        pipeline_id=None,
        file_format="delta"
    )

except Exception as e:
    # -----------------------------
    # Log FAILURE to task logs
    # -----------------------------
    try:
        log_task_status(
            status="FAILED",
            message=str(e),
            pipeline_name=pipeline_name,
            pipeline_id=None,
            file_format="delta"
        )
    except Exception as log_error:
        print(f"Failed to write task log: {log_error}")
    
    # Re-raise original error for debugging
    raise

In [None]:
# --- Task 3: Write to bronze

try:
    # Count rows first
    rows_processed = airlines_df.count()

    # Overwrite Delta table safely
    airlines_df.write.\
    mode("overwrite").\
    option("overwriteSchema", "true").\
    saveAsTable(f"`{catalog}`.`01_bronze`.`unikargo_airlines_bronze`")

    log_task_status(
        status="SUCCESS",
        rows=rows_processed,
        message="Airlines data written successfully",
        pipeline_name=pipeline_name,
        pipeline_id=None,
        file_format="delta"
    )
    

except Exception as e:
    # Log FAILURE safely
    try:
        log_task_status(
            status="FAILED",
            message=str(e)
        )
    except AnalysisException as log_e:
        print(f"Failed to log task event: {log_e}")
    raise  # re-raise original error

In [None]:
#  Log read task for debugging
# try:
#     logs_df = spark.read.format("delta").load(LOG_PATH_TASK)
#     logs_df.orderBy(col("timestamp").desc()).show(10, truncate=False)
# except Exception:
#     print(f"No task logs found yet at {LOG_PATH_TASK}")

In [10]:
# Set environment
environment = "dev"  # dev / staging / prod

# Get the log path from your config
log_base_path = {
    "dev": "abfss://medallion@adlsunikarrgodev.dfs.core.windows.net/logs/dev_task_logs",
    "staging": "abfss://medallion@adlsunikargostg.dfs.core.windows.net/logs/staging_task_logs",
    "prod": "abfss://medallion@adlsunikargoprd.dfs.core.windows.net/logs/prod_task_logs"
}[environment]

# Read Delta logs
logs_df = spark.read.format("delta").load(log_base_path)

# Show latest logs
logs_df.orderBy("timestamp", ascending=False).show(20, truncate=False)




+------------------------------------+------------------+-----------+--------------------+----------+-----------------+-------+----+-----------------+--------------------------------------------------------------------------------+--------------------------------------------------------------------------+----------+-------------+--------------------------+----------+
|pipeline_id                         |pipeline_name     |environment|run_id              |task_id   |operation        |status |rows|execution_time_ms|source_path                                                                     |target_path                                                               |error_type|error_message|timestamp                 |log_date  |
+------------------------------------+------------------+-----------+--------------------+----------+-----------------+-------+----+-----------------+--------------------------------------------------------------------------------+-----------------------------

In [None]:
# --- Task 1: Read (with simulated failure)
try:
    # Simulate a failure by pointing to a non-existent file
    airlines_df = (
        spark.read
        .schema(airlines_schema)
        .option("header", "true")
        .csv("abfss://medallion@adlsunikarrgodev.dfs.core.windows.net/raw/volumes/non_existent_file.csv")
    )

    rows_processed = airlines_df.count()

    # Log SUCCESS
    log_task_status(
        status="SUCCESS",
        rows=rows_processed,
        message="Airlines data read successfully",
        pipeline_name=pipeline_name,
        pipeline_id=None,
        file_format="delta"
    )

except Exception as e:
    # Log FAILURE
    try:
        log_task_status(
            status="FAILED",
            message=str(e),
            pipeline_name=pipeline_name,
            pipeline_id=None,
            file_format="delta"
        )
    except Exception as log_error:
        print(f"Failed to write task log: {log_error}")

    print("Simulated failure caught. Task log written.")
    # Optionally continue without raising
    # raise  # Uncomment to still crash the notebook


In [None]:
# --- Task 1: Read (with simulated failure)

tasks = [
    {"name": "read_airlines", "path": "abfss://medallion@adlsunikarrgodev.dfs.core.windows.net/raw/volumes/airlines.csv"},
    {"name": "read_invalid", "path": "abfss://medallion@adlsunikarrgodev.dfs.core.windows.net/raw/volumes/missing_file.csv"},
    {"name": "read_flights", "path": "abfss://medallion@adlsunikarrgodev.dfs.core.windows.net/raw/volumes/flights.csv"}
]

for task in tasks:
    try:
        print(f"Running task: {task['name']}")
        
        # Attempt to read CSV
        df = (
            spark.read
            .schema(airlines_schema)  # or flights_schema for other tasks
            .option("header", "true")
            .csv(task["path"])
        )
        rows_processed = df.count()
        
        # Log success
        log_task_status(
            status="SUCCESS",
            rows=rows_processed,
            message=f"{task['name']} completed successfully",
            pipeline_name=pipeline_name,
            pipeline_id=None,
            file_format="delta"
        )
        
    except Exception as e:
        # Log failure but continue
        log_task_status(
            status="FAILED",
            message=str(e),
            pipeline_name=pipeline_name,
            pipeline_id=None,
            file_format="delta"
        )
        print(f"Task {task['name']} failed: {e}")
        # Do NOT raise, continue to next task


In [None]:
tasks = [
    {"name": "read_airlines", "path": "abfss://medallion@adlsunikarrgodev.dfs.core.windows.net/raw/volumes/airlines.csv", "fail_transform": False},
    {"name": "read_flights", "path": "abfss://medallion@adlsunikarrgodev.dfs.core.windows.net/raw/volumes/flights.csv", "fail_transform": True}
]

for task in tasks:
    try:
        print(f"Running task: {task['name']}")
        
        # Read CSV
        df = (
            spark.read
            .schema(airlines_schema)  # or flights_schema for flights
            .option("header", "true")
            .csv(task["path"])
        )
        
        # Simulate a runtime error during transformation
        if task.get("fail_transform"):
            # Example: divide by zero or invalid operation
            df = df.withColumn("simulate_error", df["iata_code"] / 0)
        
        rows_processed = df.count()
        
        # Log success
        log_task_status(
            status="SUCCESS",
            rows=rows_processed,
            message=f"{task['name']} completed successfully",
            pipeline_name=pipeline_name,
            pipeline_id=None,
            file_format="delta"
        )
        
    except Exception as e:
        # Log failure but continue
        log_task_status(
            status="FAILED",
            message=str(e),
            pipeline_name=pipeline_name,
            pipeline_id=None,
            file_format="delta"
        )
        print(f"Task {task['name']} failed: {e}")
        # Continue to next task without stopping notebook


In [None]:
tasks = [
    {"name": "read_airlines", "path": "abfss://medallion@adlsunikarrgodev.dfs.core.windows.net/raw/volumes/airlines.csv", "fail_transform": False},
    {"name": "read_flights", "path": "abfss://medallion@adlsunikarrgodev.dfs.core.windows.net/raw/volumes/flights.csv", "fail_transform": True}
]

for task in tasks:
    try:
        print(f"Running task: {task['name']}")
        
        # Read CSV
        df = (
            spark.read
            .schema(airlines_schema)  # adjust schema per dataset
            .option("header", "true")
            .csv(task["path"])
        )
        
        # Simulate a runtime error during transformation
        if task.get("fail_transform"):
            df = df.withColumn("simulate_error", df["iata_code"] / 0)
        
        rows_processed = df.count()
        
        # Log success
        log_task_status(
            status="SUCCESS",
            rows=rows_processed,
            message=f"{task['name']} completed successfully",
            pipeline_name=pipeline_name,
            pipeline_id=None,
            file_format="delta"
        )
        
    except Exception as e:
        # Log failure with rows_processed = 0
        try:
            log_task_status(
                status="FAILED",
                rows=0,  # set to 0 on failure
                message=str(e),
                pipeline_name=pipeline_name,
                pipeline_id=None,
                file_format="delta"
            )
        except Exception as log_error:
            print(f"Failed to write task log: {log_error}")
        
        print(f"Task {task['name']} failed: {e}")
        # Continue to next task
