In [None]:
"""
notebooks/data_cleaning.ipynb
Performs cleaning on insurance data using PySpark.
Logs everything to logs/pipeline.log.
"""

# Parameters
input_path = None
output_path = None


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, trim, lower, when, to_date, lit
from pyspark.sql.types import StringType
import logging
import os
import pandas as pd
import re
import yaml

# ------------------ Load Config ------------------
config_path = globals().get("config_path", None)

if config_path:
    config_file = config_path
else:
    # fallback to default (local testing)
    notebook_dir = os.path.dirname(os.path.abspath(__file__))
    project_root = os.path.abspath(os.path.join(notebook_dir, "..", ".."))
    config_file = os.path.join(project_root, "config", "config.yaml")

with open(config_file, "r") as f:
    config = yaml.safe_load(f)

input_path = config["paths"]["raw_data"]
output_path = config["paths"]["cleaned_data"]
log_file = config["log_file"]


# ------------------ Logger Setup ------------------

os.makedirs(os.path.dirname(log_file), exist_ok=True)
logger = logging.getLogger("data_cleaning")
if not logger.hasHandlers():
    handler = logging.FileHandler(log_file)
    formatter = logging.Formatter("[CLEANING] %(asctime)s - %(levelname)s - %(message)s")
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)

# ------------------ Spark Session ------------------

spark = SparkSession.builder \
    .appName("InsuranceCleaner") \
    .config("spark.sql.shuffle.partitions", "2") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
    .config("spark.hadoop.io.native.lib.available", "false") \
    .config("spark.speculation", "false") \
    .getOrCreate()

logger.info("Spark session started.")

# ------------------ Read JSON ------------------
try:
    logger.info("Starting the file load from input folder")
    df = spark.read.json(input_path)
    logger.info("Successfully loaded input JSON file.")
except Exception as e:
    logger.error(f"Failed to load input file: {e}")
    raise  # or continue gracefully

# ------------------ Cleaning Functions ------------------
def clean_string_columns(df, columns):
    for col_name in columns:
        df = df.withColumn(col_name, trim(col(col_name)))
        df = df.withColumn(col_name, regexp_replace(col(col_name), r'\s+', ' '))
    return df

def standardize_gender(df):
    return df.withColumn("gender", 
        when(lower(col("gender")).isin("m", "male"), "Male")
       .when(lower(col("gender")).isin("f", "female"), "Female")
       .otherwise("Other")
    )

def standardize_dates(df, date_columns):
    for col_name in date_columns:
        df = df.withColumn(col_name, to_date(col(col_name), 'yyyy-MM-dd'))
    return df

def clean_email(df):
    return df.withColumn("email_address", lower(trim(col("email_address"))))

def normalize_booleans(df, bool_columns):
    for col_name in bool_columns:
        df = df.withColumn(col_name, 
            when(lower(col(col_name)).isin("yes", "y", "true"), "Yes")
           .when(lower(col(col_name)).isin("no", "n", "false"), "No")
           .otherwise("Unknown")
        )
    return df

def nullify_claim_status_if_id_missing(df):
    return df.withColumn(
        "claim_status",
        when(
            col("claim_id").isNull() |
            (lower(col("claim_id")) == "nan") |
            (lower(col("claim_id")) == "null") |
            (col("claim_id") == ""),
            lit(None)
        ).otherwise(col("claim_status"))
    )


# ------------------ Execute Cleaning ------------------
try:
    logger.info("Started cleaning string columns")
    df = clean_string_columns(df, [
        "address", "agent_name", "claim_reason", "claim_status",
        "customer_name", "education_level", "employment_status",
        "marital_status", "payment_frequency", "policy_status",
        "policy_type", "property_type", "region", "vehicle_type"
    ])
    logger.info("Cleaning string columns successful")

    logger.info("Started standardizing gender")
    df = standardize_gender(df)
    logger.info("Standardizing genders successful")
    
    logger.info("Started standardizing Date columns")
    df = standardize_dates(df, [
        "claim_date", "date_of_birth", "last_payment_date",
        "next_due_date", "policy_start_date", "policy_end_date"
    ])
    logger.info("Standardizing Dates successful")
    
    logger.info("Started cleaning EMAIL")
    df = clean_email(df)
    logger.info("Cleaning EMAIL successfull")
    

    
    logger.info("Started normalizing booleans")
    df = normalize_booleans(df, ["renewal_flag", "smoker"])
    logger.info("Normalizing booleans successful")
    
    logger.info("Started nullifying claim status if claim_id is missing")
    df = nullify_claim_status_if_id_missing(df)
    logger.info("Nullifying Claim Status successful")

    logger.info("Data cleaning completed successfully.")
except Exception as e:
    logger.error(f"Error during data cleaning: {e}")
    raise 

# ------------------ Save Cleaned Data ------------------
try:
    logger.info("Started saving the cleaned output file")
    output_path = output_path
    desired_order = [
        "policy_id", "customer_id", "customer_name", "gender", "date_of_birth", "age",
        "policy_type", "coverage_amount", "premium_amount", "payment_frequency",
        "policy_start_date", "policy_end_date",
        "claim_id", "claim_date", "claim_amount", "claim_reason", "claim_status",
        "agent_id", "agent_name", "region",
        "customer_income", "marital_status", "number_of_dependents",
        "vehicle_type", "vehicle_age",
        "property_type", "property_value",
        "health_condition", "smoker", "employment_status", "education_level",
        "policy_status", "last_payment_date", "next_due_date", "renewal_flag",
        "contact_number", "email_address", "address", "zip_code"
    ]

    # Reorder Spark DataFrame and convert to Pandas
    logger.info("Re-ordering column names per the input order")
    existing_columns = [c for c in desired_order if c in df.columns]
    df = df.select(*existing_columns)
    df_pd = df.toPandas()
    logger.info("Re-ordering successful")

    # Format datetime columns as string
    logger.info("Formatting DateTime columns")
    date_columns = [
        "claim_date", "date_of_birth", "last_payment_date",
        "next_due_date", "policy_start_date", "policy_end_date"
    ]
    for col_name in date_columns:
        if col_name in df_pd.columns:
            df_pd[col_name] = df_pd[col_name].astype(str)
    logger.info("Formatting DateTime columns successful")

    logger.info("Converting Age column to type:Integer")
    if "age" in df_pd.columns:
        df_pd["age"] = pd.to_numeric(df_pd["age"], errors="coerce").astype("Int64")
    logger.info("Converted successfully")

    # Save as JSON (one line per record)
    df_pd.to_json(output_path, orient="records", lines=True)

    logger.info(f"Saving data successful. Cleaned data written to {output_path} using Pandas.")
except Exception as e:
    logger.error(f"Failed to write cleaned data: {e}")
    raise




# ------------------ Stop Spark ------------------
spark.stop()
logger.info("Spark session stopped.")