SILVER LAYER – CLEANING & TRANSFORMATION

In [0]:
%sql
USE CATALOG main;
USE SCHEMA agriculture_data;

Bronze Crop Production Data

In [0]:

# Load raw crop production data from bronze layer
crop_bronze_df = spark.table("bronze_crop_production")


Validate and Clean Crop Production Data

In [0]:
from pyspark.sql.functions import trim, lower, avg, col, count, sum, max, min, lit, when, round, isnull

# Apply core data quality rules

crop_clean_df = (
    crop_bronze_df
    # Area cultivated must be greater than zero
    .filter(col("area_hectares") > 0)
    # Production should not be null
    .filter(col("production_tonnes").isNotNull())
    # Mandatory dimension fields should not be null
    .filter(
        col("state").isNotNull() &
        col("district").isNotNull() &
        col("crop").isNotNull() &
        col("year").isNotNull()
    )
)


In [0]:
# Data Completeness Check

# Count total records after cleaning
total_records = crop_clean_df.count()

# Count records with missing key fields
null_key_records = crop_clean_df.filter(
    col("state").isNull() |
    col("district").isNull() |
    col("crop").isNull() |
    col("year").isNull()
).count()

# Print total record count
print(f"Total records: {total_records}")
# Print records with missing keys
print(f"Records with missing keys: {null_key_records}")


Total records: 10000
Records with missing keys: 0


Standardize Text Columns

In [0]:

# Standardize text columns to avoid duplicates
crop_standardized_df = (
    crop_clean_df
    # Normalizing  values
    .withColumn("state", trim(lower(col("state"))))
    .withColumn("district", trim(lower(col("district"))))
    .withColumn("crop", trim(lower(col("crop"))))
    .withColumn("season", trim(lower(col("season"))))
)


Derive Yield Metric

In [0]:
# Calculate yield as production per hectare
crop_enriched_df = (
    crop_standardized_df
    .withColumn(
        "yield_tonnes_per_hectare",
        col("production_tonnes") / col("area_hectares")
    )
)

# Yield Validation

# Count invalid yield values
invalid_yield_records = crop_enriched_df.filter(
    col("yield_tonnes_per_hectare") <= 0
).count()

# Print invalid yield count
print(f"Invalid yield records (<=0): {invalid_yield_records}")



Invalid yield records (<=0): 0


Silver – Logging – crop_production

In [0]:
from pyspark.sql.types import (
    StructType, StructField,
    StringType, LongType, TimestampType
)
from pyspark.sql.functions import current_timestamp


In [0]:
log_schema = StructType([
    StructField("pipeline_name", StringType(), True),
    StructField("layer", StringType(), True),
    StructField("table_name", StringType(), True),
    StructField("status", StringType(), True),
    StructField("record_count", LongType(), True),
    StructField("start_time", TimestampType(), True),
    StructField("end_time", TimestampType(), True),
    StructField("error_message", StringType(), True)
])


In [0]:
# Logging metadata
pipeline_name = "agriculture_pipeline"
layer = "SILVER"
table_name = "silver_crop_production"

try:
    # Record count after silver transformation
    record_count = crop_enriched_df.count()

    # Write silver table
    crop_enriched_df.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable(table_name)

    # Create success log dataframe
    log_df = spark.createDataFrame(
        [(pipeline_name, layer, table_name, "SUCCESS", record_count, None, None, None)],
        schema=log_schema
    )

    # Add timestamps
    log_df = log_df \
        .withColumn("start_time", current_timestamp()) \
        .withColumn("end_time", current_timestamp())

    # Append log entry with schema merge enabled
    log_df.write.format("delta") \
        .option("mergeSchema", "true") \
        .mode("append") \
        .saveAsTable("agriculture_data.pipeline_logs")

except Exception as e:
    # Create failure log dataframe
    log_df = spark.createDataFrame(
        [(pipeline_name, layer, table_name, "FAILED", None, None, None, str(e))],
        schema=log_schema
    )

    # Add timestamps
    log_df = log_df \
        .withColumn("start_time", current_timestamp()) \
        .withColumn("end_time", current_timestamp())

    # Append failure log
    log_df.write.format("delta") \
        .option("mergeSchema", "true") \
        .mode("append") \
        .saveAsTable("agriculture_data.pipeline_logs")

    # Fail task for Airflow
    raise


Bronze Weather Data

In [0]:
# Load raw weather data
weather_bronze_df = spark.table("bronze_weather")


Clean and Validate Weather Data

In [0]:
from pyspark.sql.functions import avg

# Apply weather data validation rules
weather_clean_df = (
    weather_bronze_df
    # Valid rainfall values
    .filter(col("rainfall_mm") > 0)
    # Mandatory dimension fields should not be null
    .filter(
        col("state").isNotNull() &
        col("district").isNotNull() &
        col("year").isNotNull()
    )
)


Handle Duplicate Weather Records

In [0]:
# Aggregate rainfall by region and year
weather_aggregated_df = (
    weather_clean_df
    .groupBy("state", "district", "year")
    .agg(avg("rainfall_mm").alias("avg_rainfall_mm"))
)


Silver – Logging – weather

In [0]:
# Logging metadata
pipeline_name = "agriculture_pipeline"
layer = "SILVER"
table_name = "silver_weather"

try:
    # Record count after aggregation
    record_count = weather_aggregated_df.count()

    # Write silver weather table
    weather_aggregated_df.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable(table_name)

    # Create success log dataframe
    log_df = spark.createDataFrame(
        [(pipeline_name, layer, table_name, "SUCCESS", record_count, None, None, None)],
        schema=log_schema
    )

    # Add timestamps
    log_df = log_df.withColumn("start_time", current_timestamp()) \
                   .withColumn("end_time", current_timestamp())

    # Append log entry
    log_df.write.format("delta") \
        .option("mergeSchema", "true") \
        .mode("append") \
        .saveAsTable("agriculture_data.pipeline_logs")

except Exception as e:
    # Create failure log dataframe
    log_df = spark.createDataFrame(
        [(pipeline_name, layer, table_name, "FAILED", None, None, None, str(e))],
        schema=log_schema
    )

    log_df = log_df.withColumn("start_time", current_timestamp()) \
                   .withColumn("end_time", current_timestamp())

    log_df.write.format("delta") \
        .option("mergeSchema", "true") \
        .mode("append") \
        .saveAsTable("agriculture_data.pipeline_logs")

    raise


Bronze Soil Health Data

In [0]:
# Load raw soil health dataset
soil_bronze_df = spark.table("bronze_soil_health")


Clean and Validate Soil Health Data

In [0]:
# Apply soil quality validation rules

soil_clean_df = (
    soil_bronze_df
    # pH range is constrained to realistic
    .filter(col("ph_level").between(5.5, 8.5))
    # Required fields present
    .filter(
        col("state").isNotNull() &
        col("district").isNotNull()
    )
)


Deduplicate Soil Records

In [0]:
# Keep one soil health record per region
soil_dedup_df = soil_clean_df.dropDuplicates(["state", "district"])


Silver – Logging – soil_health

In [0]:
# Logging metadata
pipeline_name = "agriculture_pipeline"
layer = "SILVER"
table_name = "silver_soil_health"

try:
    # Record count after soil cleaning
    record_count = soil_dedup_df.count()

    # Write silver soil table
    soil_dedup_df.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable(table_name)

    # Create success log dataframe
    log_df = spark.createDataFrame(
        [(pipeline_name, layer, table_name, "SUCCESS", record_count, None, None, None)],
        schema=log_schema
    )

    log_df = log_df.withColumn("start_time", current_timestamp()) \
                   .withColumn("end_time", current_timestamp())

    log_df.write.format("delta") \
        .option("mergeSchema", "true") \
        .mode("append") \
        .saveAsTable("agriculture_data.pipeline_logs")

except Exception as e:
    # Create failure log dataframe
    log_df = spark.createDataFrame(
        [(pipeline_name, layer, table_name, "FAILED", None, None, None, str(e))],
        schema=log_schema
    )

    log_df = log_df.withColumn("start_time", current_timestamp()) \
                   .withColumn("end_time", current_timestamp())

    log_df.write.format("delta") \
        .option("mergeSchema", "true") \
        .mode("append") \
        .saveAsTable("agriculture_data.pipeline_logs")

    raise


Bronze Market Prices Data

In [0]:
# Load raw market prices data
market_bronze_df = spark.table("bronze_market_prices")


Clean and Validate Market Prices

In [0]:
# Validate market price records
market_clean_df = (
    market_bronze_df
    # Valid price values
    .filter(col("market_price_per_quintal") > 0)
    .filter(
        col("crop").isNotNull() &
        col("year").isNotNull()
    )
)


Standardize Crop Names in Market Data

In [0]:
# Standardize crop names
market_standardized_df = (
    market_clean_df
    # Normalize crop values
    .withColumn("crop", trim(lower(col("crop"))))
)


Silver – Logging – market_prices

In [0]:
# Logging metadata
pipeline_name = "agriculture_pipeline"
layer = "SILVER"
table_name = "silver_market_prices"

try:
    # Record count after market price cleaning
    record_count = market_standardized_df.count()

    # Write silver market prices table
    market_standardized_df.write.format("delta") \
        .mode("overwrite") \
        .saveAsTable(table_name)

    # Create success log dataframe
    log_df = spark.createDataFrame(
        [(pipeline_name, layer, table_name, "SUCCESS", record_count, None, None, None)],
        schema=log_schema
    )

    log_df = log_df.withColumn("start_time", current_timestamp()) \
                   .withColumn("end_time", current_timestamp())

    log_df.write.format("delta") \
        .option("mergeSchema", "true") \
        .mode("append") \
        .saveAsTable("agriculture_data.pipeline_logs")

except Exception as e:
    # Create failure log dataframe
    log_df = spark.createDataFrame(
        [(pipeline_name, layer, table_name, "FAILED", None, None, None, str(e))],
        schema=log_schema
    )

    log_df = log_df.withColumn("start_time", current_timestamp()) \
                   .withColumn("end_time", current_timestamp())

    log_df.write.format("delta") \
        .option("mergeSchema", "true") \
        .mode("append") \
        .saveAsTable("agriculture_data.pipeline_logs")

    raise


Data Quality Validation Checks

In [0]:
# Total records
total_records = crop_enriched_df.count()

# Null yield records
null_yield_count = crop_enriched_df.filter(
    col("yield_tonnes_per_hectare").isNull()
).count()

# Print counts
print(f"Total records after cleaning: {total_records}")
print(f"Records with null yield: {null_yield_count}")


Total records after cleaning: 10000
Records with null yield: 0
