### Task 1: Handling Schema Mismatches using Spark
**Description**: Use Apache Spark to address schema mismatches by transforming data to match
the expected schema.

**Steps**:
1. Create Spark session
2. Load dataframe
3. Define the expected schema
4. Handle schema mismatches
5. Show corrected data

In [1]:
# Write your code from here
import pandas as pd
import numpy as np
import os
import unittest

# Optional: Spark setup
try:
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
    from pyspark.sql.functions import col
    spark = SparkSession.builder.appName("DQ_Task_Q2").getOrCreate()
    spark_available = True
except ImportError:
    print("PySpark not available. Spark functionality skipped.")
    spark_available = False

# Step 2: Define a custom exception for validation errors
class DataValidationError(Exception):
    """Custom exception for data validation errors."""
    pass

# Step 3: Define function to load CSV safely
def load_csv_safe(path):
    if not os.path.exists(path):
        raise FileNotFoundError(f"File not found: {path}")
    try:
        df = pd.read_csv(path)
        return df
    except Exception as e:
        raise ValueError(f"Error loading CSV: {str(e)}")

# Step 4: Simulate ingestion with missing values
data = {
    'customer_id': [1, 2, np.nan, 4, 5],
    'amount': [100.0, None, 200.0, 300.0, None],
    'transaction_date': ['2024-01-01', '2024-01-02', '2024-01-03', None, '2024-01-05']
}
df = pd.DataFrame(data)

# Add a new column for the data source
df['source'] = 'API'

print("Raw Ingested Data:")
display(df)

# Step 5: Schema validation
required_cols = ['customer_id', 'amount', 'transaction_date', 'source']
def check_schema(df, required_cols):
    for col in required_cols:
        if col not in df.columns:
            raise DataValidationError(f"Missing expected column: {col}")
    if not np.issubdtype(df['customer_id'].dropna().dtype, np.number):
        raise TypeError("customer_id must be numeric")

try:
    check_schema(df, required_cols)
    print("Schema validation successful.")
except Exception as e:
    print(f"Schema validation failed: {e}")

# Step 6: Report missing values
def missing_report(df):
    report = df.isnull().sum().to_frame('Missing Count')
    report['Missing %'] = (report['Missing Count'] / len(df)) * 100
    return report[report['Missing Count'] > 0]

print("Missing Value Report:")
display(missing_report(df))

# Step 7: Handle missing values
def clean_missing_data(df):
    df_cleaned = df.dropna(subset=['customer_id'])  # Drop rows with missing IDs
    df_cleaned['amount'].fillna(df_cleaned['amount'].mean(), inplace=True)  # Impute amount
    df_cleaned['transaction_date'].fillna(method='ffill', inplace=True)  # Fill forward dates
    return df_cleaned

df_clean = clean_missing_data(df)
print("Cleaned Data:")
display(df_clean)

# Step 8: Simulate schema mismatch in PySpark (optional)
if spark_available:
    print("Simulating Spark schema mismatch:")
    try:
        schema = StructType([
            StructField("customer_id", IntegerType(), True),
            StructField("amount", DoubleType(), True),
            StructField("transaction_date", StringType(), True)
        ])
        # Simulated bad input
        bad_data = [("1", "invalid_amount", "2024-01-01")]
        spark_df = spark.createDataFrame(bad_data, schema=schema)
        spark_df.show()
    except Exception as e:
        print(f"Spark schema mismatch error handled: {e}")

# Step 9: Unit tests
class TestMissingValueHandling(unittest.TestCase):

    def test_amount_imputation(self):
        test_df = pd.DataFrame({'amount': [100.0, None, 200.0]})
        mean_val = test_df['amount'].mean()
        test_df['amount'].fillna(mean_val, inplace=True)
        self.assertEqual(test_df.isnull().sum().sum(), 0)

    def test_drop_missing_ids(self):
        test_df = pd.DataFrame({'customer_id': [1, None, 3]})
        df_cleaned = test_df.dropna(subset=['customer_id'])
        self.assertEqual(len(df_cleaned), 2)

print("Running Unit Tests...")
unittest.main(argv=[''], exit=False)

# Step 10: Save cleaned data
df_clean.to_csv("cleaned_output.csv", index=False)
print("Cleaned data saved to cleaned_output.csv")

PySpark not available. Spark functionality skipped.
Raw Ingested Data:


Unnamed: 0,customer_id,amount,transaction_date,source
0,1.0,100.0,2024-01-01,API
1,2.0,,2024-01-02,API
2,,200.0,2024-01-03,API
3,4.0,300.0,,API
4,5.0,,2024-01-05,API


Schema validation successful.
Missing Value Report:


Unnamed: 0,Missing Count,Missing %
customer_id,1,20.0
amount,2,40.0
transaction_date,1,20.0


Cleaned Data:


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_cleaned['amount'].fillna(df_cleaned['amount'].mean(), inplace=True)  # Impute amount
  df_cleaned['transaction_date'].fillna(method='ffill', inplace=True)  # Fill forward dates
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_cleaned['transaction_date'].fillna(method='ffill', inplace=True)  # Fill forward dates


Unnamed: 0,customer_id,amount,transaction_date,source
0,1.0,100.0,2024-01-01,API
1,2.0,200.0,2024-01-02,API
3,4.0,300.0,2024-01-02,API
4,5.0,200.0,2024-01-05,API


..
----------------------------------------------------------------------
Ran 2 tests in 0.003s

OK


Running Unit Tests...
Cleaned data saved to cleaned_output.csv


### Task 2: Detect and Correct Incomplete Data in ETL
**Description**: Use Python and Pandas to detect incomplete data in an ETL process and fill
missing values with estimates.

**Steps**:
1. Detect incomplete data
2. Fill missing values
3. Report changes

In [None]:
# Write your code from here