### BRONZE LAYER LOAD



**Purpose:**  
This notebook performs the **Bronze layer ingestion**, creating synthetic or raw farm datasets for soil, crop, market, pest, and rainfall. These tables contains data and serve as the foundation for the Silver and Gold layers, AI advisory, and downstream analytics.

**Workflow:**  
1. **Generate synthetic data** for:
   - Soil: moisture, temperature, humidity, precipitation, city/state/country.
   - Crop: crop health, NDVI, leaf moisture, growth stage, city/state/country.
   - Market: crop prices, city/state/country.
   - Pest: pest risk, local weather conditions.
   - Rainfall: mm, type, anomaly, city/state/country.
2. **Country and city-level adjustments** for price, reflecting realistic variations.
3. **Write Bronze tables** to Delta Lake (overwrite mode).
4. **Audit logging**:
   - Records workflow job ID, task name, start/end timestamps, status (SUCCESS/FAILED), and messages in Delta audit table.

**Notes:**  
- Update `catalog_name`, `schema_name`, `workflow_job_id`, and `audit_schema_name` to match your environment.  
- In production, this can be replaced with ingestion from real IoT devices, weather stations, and market feeds.  
- These Bronze tables feed directly into the **Silver layer transformation** for cleaning and standardization.


In [0]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
import pandas as pd
import numpy as np
import traceback
import time

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS databricks_free_edition;
CREATE SCHEMA IF NOT EXISTS databricks_free_edition.databricks_bronze;
CREATE SCHEMA IF NOT EXISTS databricks_free_edition.audit_logs;


In [0]:
spark = SparkSession.builder.getOrCreate()
catalog_name = "databricks_free_edition"
schema_name = "databricks_bronze"
audit_schema_name = "audit_logs"
workflow_job_id = dbutils.widgets.get("workflow_job_id")

In [0]:

# -------------------------------
# 🧾 Audit Table Setup
# -------------------------------
def write_audit(workflow_job_id,task, status, start_ts, end_ts, message=""):
    rows = [(workflow_job_id,task, status, start_ts, end_ts, message)]
    schema = "workflow_job_id STRING,task STRING, status STRING, start_time TIMESTAMP, end_time TIMESTAMP, message STRING"
    spark.createDataFrame(rows, schema=schema).write.format("delta").mode("append").saveAsTable(
        f"{catalog_name}.{audit_schema_name}.error_reporting_audit"
    )

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {catalog_name}.{audit_schema_name}.pipeline_audit (
        task STRING,
        status STRING,
        start_time TIMESTAMP,
        end_time TIMESTAMP,
        message STRING
    ) USING delta
""")

# -------------------------------
# 🌾 Bronze ETL Start
# -------------------------------
task_name = "bronze_ingest"
start_ts = spark.sql("SELECT current_timestamp()").collect()[0][0]

try:
    print(f"🚀 Starting {task_name} ...")

    n = 1825  # ~5 years of daily records
    dates = pd.date_range(start=pd.Timestamp.now().normalize(), periods=n, freq='D')

    countries = ["India", "USA", "Brazil", "China", "Australia", "Nigeria", "France", "Argentina"]
    world_locations = {
        "India": ["Chennai", "Delhi", "Mumbai", "Kolkata", "Bangalore", "Hyderabad"],
        "USA": ["California", "Texas", "Iowa", "Kansas", "Nebraska", "Florida"],
        "Brazil": ["São Paulo", "Curitiba", "Brasília", "Salvador", "Rio de Janeiro"],
        "China": ["Beijing", "Shanghai", "Guangzhou", "Chengdu", "Wuhan"],
        "Australia": ["Sydney", "Melbourne", "Brisbane", "Perth", "Adelaide"],
        "Nigeria": ["Lagos", "Abuja", "Kano", "Ibadan", "Port Harcourt"],
        "France": ["Paris", "Lyon", "Marseille", "Toulouse", "Bordeaux"],
        "Argentina": ["Buenos Aires", "Rosario", "Córdoba", "Mendoza", "La Plata"]
    }

    crops = [
        "Rice", "Wheat", "Maize", "Sugarcane", "Cotton", "Soybean",
        "Barley", "Coffee", "Cocoa", "Palm Oil", "Sorghum", "Potato", "Banana"
    ]
    base_crop_prices = {
    "Rice": (1800, 2500),
    "Wheat": (1600, 2200),
    "Maize": (1200, 1800),
    "Sugarcane": (300, 400),
    "Cotton": (5500, 7500),
    "Soybean": (3500, 4800),
    "Barley": (1500, 2100),
    "Coffee": (12000, 20000),
    "Cocoa": (9000, 15000),
    "Palm Oil": (8000, 12000),
    "Sorghum": (1400, 2000),
    "Potato": (800, 1300),
    "Banana": (600, 900)
    }

    # Country-level price multipliers (reflecting local economics)
    country_price_factor = {
        "India": 1.0,
        "USA": 1.5,
        "Brazil": 1.2,
        "China": 1.1,
        "Australia": 1.4,
        "Nigeria": 0.8,
        "France": 1.6,
        "Argentina": 1.1
    }

    def city_variance(city):
        return 1 + np.random.uniform(-0.1, 0.1)
    
    crop_names = np.random.choice(crops, n)

    np.random.seed(42)
    chosen_countries = np.random.choice(countries, n)
    chosen_cities = [np.random.choice(world_locations[c]) for c in chosen_countries]
    chosen_states = [f"Region-{i%5}" for i in range(n)]

    # -------------------------------
    # 🌱 Generate Synthetic Tables
    # -------------------------------

    soil_df = pd.DataFrame({
        "date": dates,
        "soil_moisture": np.round(np.random.uniform(10, 80, n), 2),
        "temperature": np.round(np.random.uniform(10, 45, n), 1),
        "humidity": np.round(np.random.uniform(20, 95, n), 1),
        "precipitation_intensity": np.round(np.random.uniform(0, 20, n), 2),
        "country": chosen_countries,
        "state_or_region": chosen_states,
        "city": chosen_cities
    })
    
    crop_df = pd.DataFrame({
    "date": dates,
    "crop_name": crop_names,
    "crop_health_score": np.random.randint(50, 100, n),
    "ndvi_index": np.round(np.random.uniform(0.3, 0.9, n), 3),
    "leaf_moisture": np.round(np.random.uniform(25, 90, n), 1),
    "growth_stage": np.random.choice(["Germination","Vegetative","Flowering","Maturity"], n),
    "country": chosen_countries,
    "state_or_region": chosen_states,
    "city": chosen_cities
    })

    # Generate price per quintal based on crop, country, and city
    price_per_quintal = []
    for i in range(n):
        crop = crop_df.loc[i, "crop_name"]
        country = crop_df.loc[i, "country"]
        city = crop_df.loc[i, "city"]

        base_low, base_high = base_crop_prices[crop]
        base_price = np.random.uniform(base_low, base_high)
        adjusted_price = base_price * country_price_factor[country] * city_variance(city)

        price_per_quintal.append(round(adjusted_price, 2))

    crop_df["crop_price_per_quintal"] = price_per_quintal

    market_df = pd.DataFrame({
        "date": dates,
        "crop_name": np.random.choice(crops, n),
        "crop_price": np.round(80 + np.cumsum(np.random.normal(0, 1, n)), 2),
        "country": chosen_countries,
        "state_or_region": chosen_states,
        "city": chosen_cities
    })

    pest_df = pd.DataFrame({
        "date": dates,
        "pest_risk": np.random.randint(0, 100, n),
        "temperature": np.round(np.random.uniform(15, 40, n), 1),
        "humidity": np.round(np.random.uniform(30, 90, n), 1),
        "precipitation_intensity": np.round(np.random.uniform(0, 10, n), 2),
        "country": chosen_countries,
        "state_or_region": chosen_states,
        "city": chosen_cities
    })

    rainfall_df = pd.DataFrame({
        "date": dates,
        "rainfall_mm": np.round(np.random.uniform(0, 250, n), 1),
        "rainfall_type": np.random.choice(["Drizzle", "Light Rain", "Heavy Rain", "Storm", "None"], n),
        "rainfall_anomaly": np.round(np.random.uniform(-10, 10, n), 2),
        "country": chosen_countries,
        "state_or_region": chosen_states,
        "city": chosen_cities
    })

    # -------------------------------
    # 💾 Write Bronze Tables
    # -------------------------------
    spark.createDataFrame(soil_df).write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.bronze_soil")
    spark.createDataFrame(crop_df).write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.bronze_crop")
    spark.createDataFrame(market_df).write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.bronze_market")
    spark.createDataFrame(pest_df).write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.bronze_pest")
    spark.createDataFrame(rainfall_df).write.format("delta").mode("overwrite").saveAsTable(f"{catalog_name}.{schema_name}.bronze_rainfall")

    # -------------------------------
    # 🧾 Audit Logging
    # -------------------------------
    end_ts = spark.sql("SELECT current_timestamp()").collect()[0][0]
    write_audit(workflow_job_id,task_name, "SUCCESS", start_ts, end_ts, "Bronze tables created successfully 🌎")

    print("✅ Bronze Ingest Completed Successfully!")

except Exception as e:
    end_ts = spark.sql("SELECT current_timestamp()").collect()[0][0]
    tb = traceback.format_exc()
    write_audit(workflow_job_id,task_name, "FAILED", start_ts, end_ts, str(tb)[:4000])
    print("❌ Bronze ingest failed. See audit table for details.")
    raise
