### Task 1: Handling Schema Mismatches using Spark
**Description**: Use Apache Spark to address schema mismatches by transforming data to match
the expected schema.

**Steps**:
1. Create Spark session
2. Load dataframe
3. Define the expected schema
4. Handle schema mismatches
5. Show corrected data

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, BooleanType
from pyspark.sql.functions import col

def create_spark_session(app_name="SchemaMismatchHandling"):
    return SparkSession.builder.appName(app_name).getOrCreate()

def load_data_with_checks(spark, file_path):
    import os
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")
    try:
        df = spark.read.option("header", True).csv(file_path)
        if df.rdd.isEmpty():
            raise ValueError("Dataframe is empty.")
        return df
    except Exception as e:
        raise RuntimeError(f"Error loading data: {e}")

def cast_and_validate_schema(df, expected_schema):
    for field in expected_schema.fields:
        col_name = field.name
        col_type = field.dataType
        if col_name in df.columns:
            df = df.withColumn(col_name, col(col_name).cast(col_type))
        else:
            df = df.withColumn(col_name, col(col_name))  # could add null column if missing
    df = df.select([field.name for field in expected_schema.fields])
    
    # Post-casting validation
    for field in expected_schema.fields:
        dtype = dict(df.dtypes)[field.name]
        expected_type_name = field.dataType.simpleString()
        if dtype != expected_type_name:
            print(f"Warning: Column '{field.name}' dtype '{dtype}' does not match expected '{expected_type_name}'")
    return df

def main_spark_task(file_path):
    spark = create_spark_session()
    try:
        df = load_data_with_checks(spark, file_path)
    except Exception as e:
        print(f"Failed to load data: {e}")
        spark.stop()
        return
    
    expected_schema = StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True),
        StructField("email", StringType(), True),
        StructField("is_active", BooleanType(), True),
        StructField("balance", FloatType(), True)
    ])
    
    corrected_df = cast_and_validate_schema(df, expected_schema)
    corrected_df.show()
    
    spark.stop()

# Uncomment to test (replace 'your_data.csv' with your path)
# main_spark_task('your_data.csv')


### Task 2: Detect and Correct Incomplete Data in ETL
**Description**: Use Python and Pandas to detect incomplete data in an ETL process and fill
missing values with estimates.

**Steps**:
1. Detect incomplete data
2. Fill missing values
3. Report changes

In [6]:
import pandas as pd

def detect_and_correct_incomplete_data(file_path):
    try:
        df = pd.read_csv(file_path)
        if df.empty:
            print("CSV file is empty.")
            return
    except Exception as e:
        print(f"Error loading CSV: {e}")
        return
    
    print("Initial missing values per column:")
    missing_before = df.isnull().sum()
    print(missing_before[missing_before > 0])
    
    for col in df.columns:
        if df[col].isnull().any():
            if pd.api.types.is_numeric_dtype(df[col]):
                mean_val = df[col].mean()
                df[col].fillna(mean_val, inplace=True)
                print(f"Filled missing numeric '{col}' with mean: {mean_val}")
            else:
                mode_series = df[col].mode()
                if not mode_series.empty:
                    mode_val = mode_series[0]
                    df[col].fillna(mode_val, inplace=True)
                    print(f"Filled missing categorical '{col}' with mode: {mode_val}")
                else:
                    print(f"Cannot fill '{col}': mode not found (empty column).")
    
    missing_after = df.isnull().sum()
    print("\nMissing values after filling:")
    print(missing_after[missing_after > 0] if missing_after.sum() > 0 else "No missing values remaining.")
    
    corrected_file_path = "corrected_" + file_path
    df.to_csv(corrected_file_path, index=False)
    print(f"\nCorrected data saved to '{corrected_file_path}'.")

# Uncomment to test (replace 'your_data.csv' with your path)
# detect_and_correct_incomplete_data('your_data.csv')
