In [5]:
import pyspark
from pyspark.sql import SparkSession
import logging
from datetime import datetime
from pyspark.sql import functions as func
from configs import configs
from functions import functions as F
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv()

HOST_ADDRESS = os.getenv('HOST_ADDRESS')
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY')
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY')
USER_POSTGRES = os.getenv('USER_POSTGRES')
PASSWORD_POSTGRES = os.getenv('PASSWORD_POSTGRES')

def configure_spark():
    """Configure and return a SparkSession."""
    spark = SparkSession.builder \
        .appName("ELT Incremental Landing to Bronze AdventureWorks") \
        .config("spark.hadoop.fs.s3a.endpoint", f"http://{HOST_ADDRESS}:9000") \
        .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY) \
        .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY) \
        .config("spark.hadoop.fs.s3a.path.style.access", True) \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .config("hive.metastore.uris", "thrift://metastore:9083") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()
    return spark

def ingest_data():
    """Ingest data from PostgreSQL to Data Lake in MinIO."""
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

    logging.info("Starting Delta ingestion...")

    input_prefix_layer_name = configs.prefix_layer_name['0']  # landing layer
    table_input_name = configs.lake_path['landing_adventure_works']
    output_prefix_layer_name = configs.prefix_layer_name['1']  # bronze layer
    storage_output = configs.lake_path['bronze']

    for key, value in configs.tables_postgres_adventureworks.items():
        table = value
        table_name = F.convert_table_name(table)

        delta_path = f'{storage_output}{output_prefix_layer_name}{table_name}'

        try:
            # Get the maximum modified date from Delta Lake
            max_modified_date_delta = spark.read.format("delta") \
                .load(delta_path) \
                .select(func.max("modifieddate") \
                .alias("max_modifieddate")) \
                .collect()[0]["max_modifieddate"]

            # Read data from PostgreSQL and filter based on max_modified_date_delta
            query = f"""
                SELECT * FROM {table}
                WHERE modifieddate > '{max_modified_date_delta}'
            """
            
            df_input_data = spark.read \
                .format("jdbc") \
                .option("url", f"jdbc:postgresql://{HOST_ADDRESS}:5435/Adventureworks") \
                .option("user", USER_POSTGRES) \
                .option("password", PASSWORD_POSTGRES) \
                .option("dbtable", f"({query}) as filtered") \
                .option("driver", "org.postgresql.Driver") \
                .option("fetchsize", "10000") \
                .load()

            input_data_count = df_input_data.count()
            logging.info(f"Number of rows processed for table {table_name}: {input_data_count}")

            if input_data_count == 0:
                logging.info(f"No new data to process for table {table_name}.")
                continue

            # Add update date metadata and month_key column
            df_with_update_date = F.add_metadata(df_input_data)
            df_with_month_key = F.add_month_key(df_with_update_date, 'modifieddate')

            # Write the new data to Delta Lake
            df_with_month_key.write.format("delta").mode("append").partitionBy('month_key').save(delta_path)

            num_rows_written = df_with_month_key.count()
            logging.info(f"Table {table_name} successfully processed and saved to Delta Lake: {delta_path}. {num_rows_written} rows written.")

        except Exception as e:
            logging.error(f"Error processing table {table_name}: {str(e)}")

    logging.info("Delta ingestion completed!")

if __name__ == "__main__":
    spark = configure_spark()
    ingest_data()


2024-08-20 16:26:39,584 - INFO - Starting Delta ingestion...
2024-08-20 16:26:39,782 - INFO - Number of rows processed for table sales_countryregioncurrency: 0
2024-08-20 16:26:39,783 - INFO - No new data to process for table sales_countryregioncurrency.
2024-08-20 16:26:40,049 - INFO - Number of rows processed for table sales_creditcard: 0
2024-08-20 16:26:40,049 - INFO - No new data to process for table sales_creditcard.
2024-08-20 16:26:40,240 - INFO - Number of rows processed for table sales_currency: 0
2024-08-20 16:26:40,240 - INFO - No new data to process for table sales_currency.
2024-08-20 16:26:40,476 - INFO - Number of rows processed for table humanresources_department: 0
2024-08-20 16:26:40,477 - INFO - No new data to process for table humanresources_department.
2024-08-20 16:26:40,678 - INFO - Number of rows processed for table humanresources_employee: 0
2024-08-20 16:26:40,679 - INFO - No new data to process for table humanresources_employee.
2024-08-20 16:26:40,936 - INF