In [1]:
from pathlib import Path
import sys

PROJECT_ROOT = Path.cwd().resolve().parent
if str(PROJECT_ROOT) not in sys.path:
    sys.path.append(str(PROJECT_ROOT))

In [2]:

import re
from pyspark.sql import functions as F
from pyspark.sql.types import (
    DecimalType,
    DoubleType,
    FloatType,
    IntegerType,
    LongType,
    ShortType,
    StringType,
)
from pyspark.ml.feature import Imputer
from spark_jobs.config import BRONZE_PATH, SILVER_PATH, ensure_data_dirs
from spark_jobs.spark_session_manager import get_spark_session


def normalize_columns(df):
    current = df
    for name in df.columns:
        new_name = re.sub(r"\s+", " ", name).strip().upper()
        if new_name != name:
            current = current.withColumnRenamed(name, new_name)
    return current


def drop_null_keys(df):
    missing = [c for c in ("STATE", "DISTRICT") if c not in df.columns]
    if missing:
        print(f"Warning: columns missing for null check: {missing}")
        return df
    return df.dropna(subset=["STATE", "DISTRICT"])


def impute_numerics(spark, df):
    numeric_cols = [
        field.name
        for field in df.schema.fields
        if isinstance(
            field.dataType,
            (DoubleType, IntegerType, FloatType, LongType, ShortType, DecimalType),
        )
        and field.name not in ["YEAR"]
    ]
    if not numeric_cols:
        print("No numeric columns to impute.")
        return df
    imputer = Imputer(
        inputCols=numeric_cols,
        outputCols=[f"{c}_imputed" for c in numeric_cols],
        strategy="median",
    )
    model = imputer.fit(df)
    df_imputed = model.transform(df)
    for c in numeric_cols:
        df_imputed = df_imputed.drop(c).withColumnRenamed(f"{c}_imputed", c)
    print("Numeric nulls filled with median.")
    return df_imputed


def fill_categoricals(df):
    text_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
    if not text_cols:
        return df
    print("String nulls filled with 'Unknown'.")
    return df.fillna("Unknown", subset=text_cols)


def remove_outliers_iqr(df, cols_to_check):
    cols = [c for c in cols_to_check if c in df.columns]
    if not cols:
        print("No numeric columns found for outlier removal.")
        return df
    quantiles_data = df.approxQuantile(cols, [0.25, 0.75], 0.05)
    filter_condition = F.lit(True)
    before = df.count()
    for idx, col_name in enumerate(cols):
        q1, q3 = quantiles_data[idx]
        iqr = q3 - q1
        lower = q1 - (1.5 * iqr)
        upper = q3 + (1.5 * iqr)
        col_filter = (F.col(col_name) >= lower) & (F.col(col_name) <= upper)
        filter_condition = filter_condition & col_filter
        print(f"Outlier bounds for {col_name}: [{lower:.2f}, {upper:.2f}]")
    filtered = df.filter(filter_condition)
    after = filtered.count()
    print(f"Outliers removed: {before - after}")
    return filtered


def standardize_and_filter(df):
    current = df
    for column_name in ("STATE", "DISTRICT", "CROP", "SEASON"):
        if column_name in current.columns:
            current = current.withColumn(column_name, F.trim(F.upper(F.col(column_name))))
    if "STATE" in current.columns:
        current = current.filter(F.col("STATE") != "STATE")
    if "STATE" in current.columns and "DISTRICT" in current.columns:
        current = current.withColumn(
            "STATE_DISTRICT", F.concat(F.col("STATE"), F.lit(" - "), F.col("DISTRICT"))
        )
    desired_crops = ["MAIZE", "RICE", "WHEAT", "BARLEY"]
    if "CROP" in current.columns:
        current = current.filter(F.col("CROP").isin(desired_crops))
    if "AREA" in current.columns:
        current = current.filter(F.col("AREA") > 0)
    if "YIELD" in current.columns:
        current = current.filter(F.col("YIELD") > 0)
    print(f"Records after standardization and filters: {current.count()}")
    return current


def rename_to_snake_case(df):
    def _snake(name: str) -> str:
        name = re.sub(r"[\s\-\/\.]+", "_", name.strip())
        name = re.sub(r"[^A-Za-z0-9_]", "", name)
        name = re.sub(r"_+", "_", name)
        return name.lower().strip("_")
    current = df
    for name in df.columns:
        new_name = _snake(name)
        if new_name != name:
            current = current.withColumnRenamed(name, new_name)
    return current


In [3]:
ensure_data_dirs()
spark = get_spark_session('Silver Layer Notebook')

bronze_path = Path(BRONZE_PATH)
if not bronze_path.exists():
    raise FileNotFoundError(f'Bronze parquet not found at {bronze_path}')

df_bronze = spark.read.parquet(str(bronze_path))
df_bronze.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- District : string (nullable = true)
 |-- Crop: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- Area: double (nullable = true)
 |-- Production: integer (nullable = true)
 |-- Yield: double (nullable = true)
 |-- JAN: double (nullable = true)
 |-- FEB: double (nullable = true)
 |-- MAR: double (nullable = true)
 |-- APR: double (nullable = true)
 |-- MAY: double (nullable = true)
 |-- JUN: double (nullable = true)
 |-- JUL: double (nullable = true)
 |-- AUG: double (nullable = true)
 |-- SEP: double (nullable = true)
 |-- OCT: double (nullable = true)
 |-- NOV: double (nullable = true)
 |-- DEC: double (nullable = true)
 |-- ANN: double (nullable = true)
 |-- Jan-Feb: double (nullable = true)
 |-- Mar-May: double (nullable = true)
 |-- Jun-Sep: double (nullable = true)
 |-- Oct-Dec: double (nullable = true)
 |-- TEMP_ANNUAL: double (nullable = true)
 |-- TEMP_JAN_FEB: double (nullable 

In [4]:
df_norm = normalize_columns(df_bronze)
df_keys = drop_null_keys(df_norm)
df_imputed = impute_numerics(spark, df_keys)
df_filled = fill_categoricals(df_imputed)

outlier_columns = [c for c in ['PRODUCTION', 'AREA', 'YIELD'] if c in df_filled.columns]
df_no_outliers = remove_outliers_iqr(df_filled, outlier_columns)
df_standard = standardize_and_filter(df_no_outliers)

df_silver = rename_to_snake_case(df_standard).withColumn(
    'data_processamento', F.current_timestamp()
)

Numeric nulls filled with median.
String nulls filled with 'Unknown'.
Outlier bounds for PRODUCTION: [-8750.00, 14850.00]
Outlier bounds for AREA: [-5169.50, 8834.50]
Outlier bounds for YIELD: [-2.02, 4.75]
Outliers removed: 106951
Records after standardization and filters: 28908


In [5]:
df_silver.write.mode('overwrite').parquet(str(SILVER_PATH))
print(f'Silver saved to {SILVER_PATH}')
df_silver.select('state_district', 'crop', 'year', 'yield').show(10)
spark.stop()

Silver saved to /home/jovyan/work/data/silver/dados_limpos.parquet
+--------------------+-----+----+-----+
|      state_district| crop|year|yield|
+--------------------+-----+----+-----+
|ANDAMAN AND NICOB...|MAIZE|2009| 2.48|
|ANDAMAN AND NICOB...| RICE|2003| 1.73|
|ANDAMAN AND NICOB...| RICE|2004| 1.37|
|ANDHRA PRADESH - ...|MAIZE|2004|  2.2|
|ANDHRA PRADESH - ...|MAIZE|1999| 2.84|
|ANDHRA PRADESH - ...|MAIZE|2003| 1.72|
|ANDHRA PRADESH - ...|MAIZE|2004| 2.87|
|ANDHRA PRADESH - ...|MAIZE|1999| 2.84|
|ANDHRA PRADESH - ...|MAIZE|2003| 1.72|
|ANDHRA PRADESH - ...|MAIZE|2004| 2.87|
+--------------------+-----+----+-----+
only showing top 10 rows

