In [0]:
# ==============================================================
# Banking Lakehouse POC – Glue + Iceberg (Incremental Load + Alerts)
# ==============================================================
# This Glue job:
# 1. Reads daily CSV files from S3 (Bronze layer)
# 2. Transforms and enriches the data
# 3. Loads data incrementally into an Iceberg table using MERGE
# 4. Sends SNS email alerts on START, SUCCESS, and FAILURE
# ==============================================================

from datetime import datetime, timezone
import sys
import traceback
import boto3

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

from pyspark.sql.functions import col, to_date
from pyspark.sql.types import DecimalType

# --------------------------------------------------------------
# 0. Global Configuration
# --------------------------------------------------------------
SNS_TOPIC_ARN = "arn:aws:sns:eu-north-1:088939728901:bank-app-analytics-topic"
sns = boto3.client("sns")

def send_email(subject, message):
    sns.publish(
        TopicArn=SNS_TOPIC_ARN,
        Subject=subject,
        Message=message
    )

# --------------------------------------------------------------
# 1. Glue & Spark Session Initialization
# --------------------------------------------------------------
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# --------------------------------------------------------------
# 2. Handle Job Parameters (Optional LOAD_DATE)
# --------------------------------------------------------------
try:
    args = getResolvedOptions(sys.argv, ["JOB_NAME", "LOAD_DATE"])
    job_name = args["JOB_NAME"]
    load_date = args["LOAD_DATE"]
except Exception:
    args = getResolvedOptions(sys.argv, ["JOB_NAME"])
    job_name = args["JOB_NAME"]
    load_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")

job = Job(glueContext)
job.init(job_name, args)

start_time = datetime.now(timezone.utc)

print(f"Processing data for date: {load_date}")

# --------------------------------------------------------------
# 3. Send Job START Notification
# --------------------------------------------------------------
send_email(
    subject=f"Glue Job STARTED: {job_name}",
    message=(
        f"Glue job has started.\n\n"
        f"Job Name : {job_name}\n"
        f"Load Date: {load_date}\n"
        f"Start Time: {start_time}"
    )
)

try:
    # ----------------------------------------------------------
    # 4. Define Input Paths (Bronze Layer)
    # ----------------------------------------------------------
    # s3://bank-app-data-landing-area/raw/
    #   ├── customers/yyyy-mm-dd/
    #   ├── accounts/yyyy-mm-dd/
    #   └── transactions/yyyy-mm-dd/

    base_path = "s3://bank-app-data-landing-area/raw"

    cust_path = f"{base_path}/customers/{load_date}/"
    acc_path  = f"{base_path}/accounts/{load_date}/"
    tx_path   = f"{base_path}/transactions/{load_date}/"

    # ----------------------------------------------------------
    # 5. Read Bronze CSV Data
    # ----------------------------------------------------------
    cust_df = spark.read.option("header", True).csv(cust_path)
    acc_df  = spark.read.option("header", True).csv(acc_path)
    tx_df   = spark.read.option("header", True).csv(tx_path)

    print("Bronze data loaded successfully")

    # ----------------------------------------------------------
    # 6. Data Transformations
    # ----------------------------------------------------------
    tx_df = (
        tx_df.withColumnRenamed("txn_id", "transaction_id")
             .withColumn("amount", col("amount").cast(DecimalType(12, 2)))
             .withColumn("txn_date", to_date("timestamp"))
    )

    df_enriched = (
        tx_df.join(acc_df, "account_id")
             .join(cust_df, "customer_id")
    )

    record_count = df_enriched.count()

    if record_count == 0:
        raise Exception("No records found after transformation")

    print(f"Transformations completed. Records processed: {record_count}")

    # ----------------------------------------------------------
    # 7. Ensure Iceberg Database & Table Exist
    # ----------------------------------------------------------
    spark.sql("CREATE DATABASE IF NOT EXISTS analytics_db")

    spark.sql("""
    CREATE TABLE IF NOT EXISTS analytics_db.enriched_transactions (
        transaction_id STRING,
        account_id      STRING,
        customer_id     STRING,
        amount          DECIMAL(12,2),
        timestamp       STRING,
        txn_date        DATE
    )
    USING iceberg
    """)

    print("Iceberg database and table verified")

    # ----------------------------------------------------------
    # 8. Incremental Load using MERGE INTO
    # ----------------------------------------------------------
    df_enriched.createOrReplaceTempView("tmp_enriched")

    spark.sql("""
    MERGE INTO analytics_db.enriched_transactions t
    USING tmp_enriched s
    ON t.transaction_id = s.transaction_id

    WHEN MATCHED THEN
      UPDATE SET
        t.account_id = s.account_id,
        t.customer_id = s.customer_id,
        t.amount = s.amount,
        t.timestamp = s.timestamp,
        t.txn_date = s.txn_date

    WHEN NOT MATCHED THEN
      INSERT (
        transaction_id,
        account_id,
        customer_id,
        amount,
        timestamp,
        txn_date
      )
      VALUES (
        s.transaction_id,
        s.account_id,
        s.customer_id,
        s.amount,
        s.timestamp,
        s.txn_date
      )
    """)

    print("MERGE operation completed")

    # ----------------------------------------------------------
    # 9. SUCCESS Notification
    # ----------------------------------------------------------
    send_email(
        subject=f"Glue Job SUCCESS: {job_name}",
        message=(
            f"Glue job completed successfully.\n\n"
            f"Job Name : {job_name}\n"
            f"Load Date: {load_date}\n"
            f"Records Processed: {record_count}\n"
            f"End Time: {datetime.now(timezone.utc)}"
        )
    )

    job.commit()

except Exception as e:
    # ----------------------------------------------------------
    # 10. FAILURE Notification
    # ----------------------------------------------------------
    error_message = traceback.format_exc()

    send_email(
        subject=f"Glue Job FAILED: {job_name}",
        message=(
            f"Glue job failed.\n\n"
            f"Job Name : {job_name}\n"
            f"Load Date: {load_date}\n"
            f"Failure Time: {datetime.now(timezone.utc)}\n\n"
            f"Error Details:\n{error_message}"
        )
    )

    print("Job failed")
    raise


#### Instructions to run the above script:
- Create new bucket for iceberg tables.
- Create another bucket for input files and place them in respective folders.
- Create a new glue script using script editor in AWS Glue.
- Copy Paste above script.
- Add job parameters (Key, Values) 

 --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.warehouse=s3://bank-app-lakehouse-iceberg/warehouse/ --conf spark.sql.defaultCatalog=glue_catalog

-- --datalake-formats iceberg

- Run the script.