In [1]:
import yaml
from dotenv import load_dotenv
import os
from datetime import date

# Step 0: Activate findspark
import findspark
findspark.init()

# Step 1: Import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_utc_timestamp, col, from_unixtime, date_format, hour
from pyspark.sql.functions import isnan, when, count, col, sum
from pyspark.sql import functions as F
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import sha2, concat_ws
from pyspark.sql.functions import input_file_name
from datetime import datetime
from pyspark.sql import Row
import logging
import uuid
import sys 

In [2]:
# Load YAML from a file
with open('config.yaml', 'r') as file:
    config = yaml.safe_load(file)


project_path = os.getcwd()

raw_data_path = os.path.join(project_path, config['etl']['raw_data_path']) 
clean_data_path = os.path.join(project_path, config['etl']['cleansed_data_path'])
iceberg_jar = config['iceberg']['jar']
warehouse_path = os.path.join(project_path, config['iceberg']['warehouse_location'])
iceberg_schema = config.get('iceberg').get('catalog_name') + "." + config.get('iceberg').get('database_name') 
logs_table = iceberg_schema + "." + config.get('iceberg').get('logs_table')
error_table = iceberg_schema + "." + config.get('iceberg').get('error_table') 

In [3]:
spark = SparkSession.builder \
    .appName("spark_etl") \
    .config("spark.jars", iceberg_jar) \
    .config("spark.hadoop.hadoop.native.io", "false") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", warehouse_path) \
    .getOrCreate()
    

In [4]:
uuid = 'e1daebdc-9ac6-4088-945c-d768b039ceb9'
sql_validate = f"select * from {logs_table} where uuid = '{uuid}' order by ts_start asc"

spark.sql(sql_validate).show()

+--------------------+--------------------+--------------------+----------------+------------+--------------------+-----------------+-------------+-------------+
|                uuid|            ts_start|              ts_end|duration_seconds|process_name|    sub_process_name|table_destination|record_insert|column_length|
+--------------------+--------------------+--------------------+----------------+------------+--------------------+-----------------+-------------+-------------+
|e1daebdc-9ac6-408...|2025-11-17 18:59:...|2025-11-17 18:59:...|        1.718924|    ETL ADLS|load csv from source|  spark dataframe|        18383|            5|
+--------------------+--------------------+--------------------+----------------+------------+--------------------+-----------------+-------------+-------------+



In [5]:
sql_error = f"select * from {error_table} where uuid = '{uuid}' order by timestamp asc"

# sql_error = f"select * from {error_table} order by timestamp asc"

spark.sql(sql_error).show()

+--------------------+--------------------+------------+--------------------+-----------------+--------------------+
|           timestamp|                uuid|process_name|    sub_process_name|table_destination|       error_message|
+--------------------+--------------------+------------+--------------------+-----------------+--------------------+
|2025-11-17T18:59:...|e1daebdc-9ac6-408...|    ETL ADLS|check missing_col...|  spark dataframe|Missing columns: ...|
+--------------------+--------------------+------------+--------------------+-----------------+--------------------+



In [7]:
spark.stop()