In [0]:
%run ./01-config

In [0]:
%run ./02-setup

In [0]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from datetime import datetime
from pyspark.sql.functions import lit, current_timestamp, input_file_name, sha2, concat_ws
import requests

class Bronze:
    def __init__(self):
        
        print("Loading Bronze Layer configuration settings...")
        conf = Config()
        self.nyc_population_by_community_url = conf.nyc_population_by_community_url
        self.nyc_restaurant_inspection_url = conf.nyc_restaurant_inspection_url
        self.nyc_restaurant_inspection_bronze_path = conf.storage_account + "bronze_db/nyc_restaurant_inspection_raw/"
        self.nyc_population_bronze_path = conf.storage_account + "bronze_db/nyc_population_by_community_raw/"
        self.catalog_name = conf.catalog_name
        self.db_name = "bronze_db"
        print("✅ Configuration Loaded:")
        print(f"   - NYC Restaurant Inspection URL: {self.nyc_restaurant_inspection_url}")
        print(f"   - NYC Population by Community URL: {self.nyc_population_by_community_url}")
        print(f"   - Storage Path for Restaurant Inspection: {self.nyc_restaurant_inspection_bronze_path}")
        print(f"   - Storage Path for Population Data: {self.nyc_population_bronze_path}")
        print(f"   - Catalog Name: {self.catalog_name}")
        print(f"   - Database Name: {self.db_name}")
        print("🚀 Bronze Ingestion Initialized Successfully! 🎯\n")
    
    def fetch_and_store_api_data(self, api_url, folder_path):

        """Load API data into the Bronze layer into json format."""

        self.api_url = api_url
        self.folder_path = folder_path

        print(f"📡 Fetching data from API {self.api_url}...")
        count_responses = requests.get(f"{self.api_url}?$select=count(*)")
        if count_responses.status_code == 200:
            total_rows = int(count_responses.json()[0]["count"])
        else:
            raise Exception("Error fetching Row Count")

        response = requests.get(f"{self.api_url}?$limit={total_rows}")
        if response.status_code != 200:
            raise Exception(f"❌ API Error: {response.status_code}")

        data = response.json()
        
        if not data:
            raise Exception(f"⚠️ No Data Returned from API {self.api_url}.")

        print(f"✅ Successfully fetched {len(data)} records from API {self.api_url}.")

        df = spark.createDataFrame(data)
        df = df.withColumn("source_file", lit(self.api_url))
        
        # Save as JSON to storage for streaming
        df.write.mode("overwrite").json(self.folder_path + "data_file/")

        print(f"🎯 Saved the json file at location {self.folder_path}data_file/")
    
    def run_api_method(self):
        
        print("🔹 Starting data ingestion for NYC Restaurant Inspection...")
        self.fetch_and_store_api_data(self.nyc_restaurant_inspection_url, self.nyc_restaurant_inspection_bronze_path)

        print("🔹 Starting data ingestion for NYC Population by Community...")
        self.fetch_and_store_api_data(self.nyc_population_by_community_url, self.nyc_population_bronze_path)
    
    def enable_schema_evolution(self):
        """Enable schema evolution for Delta Lake merge operations."""

        spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
        print(f"✅ Schema evolution enabled")
    
    def batch_writer(self, folder_path, table_name):

        self.folder_path = folder_path
        self.table_name = table_name
        ingestion_timestamp = datetime.utcnow()

        df_batch = (spark.read
                    .format("json")
                    .option("inferSchema", "true")
                    .load(folder_path + "data_file/")
                    .withColumn("load_date_time", lit(ingestion_timestamp))
                )
        
        key_columns = [col for col in df_batch.columns if col != "load_date_time"]

        # Generate Unique Hash Column using SHA-256
        df_batch = (df_batch
                    .withColumn("unique_hash", sha2(concat_ws("|", *[df_batch[col] for col in key_columns]), 256))
                    .dropDuplicates(["unique_hash"])
                )

        
        # Define Delta table path
        bronze_table_path = f"{self.catalog_name}.{self.db_name}.{self.table_name}"

        try:
            # Load existing Delta table
            bronze_table = DeltaTable.forName(spark, bronze_table_path)
            print(f"✅ Found existing table: {bronze_table_path}")
            
            # Perform MERGE operation
            print(f"🔄 Merging new data into {bronze_table_path}...")
            self.enable_schema_evolution()
            
            (bronze_table.alias("bronze")
                .merge(df_batch.alias("new_data"), "bronze.unique_hash = new_data.unique_hash")
                .whenMatchedUpdateAll()
                .whenNotMatchedInsertAll()
                .execute()
            )

            print(f"✅ Merge operation completed successfully for {bronze_table_path}.")

        except:
            print(f"⚠️ Table {bronze_table_path} does not exist. Creating a new Delta table.")
            # Create new Delta table
            (df_batch.write
                .format("delta")
                .option("mergeSchema", "true")
                .mode("overwrite")
                .saveAsTable(bronze_table_path)
            )
            print(f"✅ Created new table: {bronze_table_path}")

     
    def load_stream_into_bronze(self):

        print("🔹 Loading data into Bronze Layer table for NYC Restaurant Inspection...")
        self.batch_writer(folder_path=self.nyc_restaurant_inspection_bronze_path, table_name="nyc_restaurant_inspection_raw")
        print(f"✅ Data successfully loaded into {self.catalog_name}.{self.db_name}.nyc_restaurant_inspection_raw")

        print("🔹 Loading data into Bronze Layer table for NYC Borough Population...")
        self.batch_writer(folder_path=self.nyc_population_bronze_path, table_name="nyc_population_by_community_raw")
        print(f"✅ Data successfully loaded into {self.catalog_name}.{self.db_name}.nyc_population_by_community_raw")

    def bronze_layer_execution(self):

        """Load data into the Bronze layer with incremental loading."""
        
        setup = Setup("bronze_db")
        setup.create_db()

        self.run_api_method()
        self.load_stream_into_bronze()

        print(f"🎯 Data processing for Bronze completed successfully.\n")
