### Task 1: Handling Schema Mismatches using Spark
**Description**: Use Apache Spark to address schema mismatches by transforming data to match
the expected schema.

**Steps**:
1. Create Spark session
2. Load dataframe
3. Define the expected schema
4. Handle schema mismatches
5. Show corrected data

In [3]:
# handle_schema_mismatches.py

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, BooleanType
from pyspark.sql.functions import col

# -----------------------------------------------------------------------------
# Step 1: Create Spark session
# -----------------------------------------------------------------------------
spark = SparkSession.builder \
    .appName("SchemaMismatchHandler") \
    .getOrCreate()

# -----------------------------------------------------------------------------
# Step 2: Simulate data with schema mismatches (Create CSV manually in code)
# -----------------------------------------------------------------------------
# Create a sample CSV file with incorrect types
csv_data = """id,name,age,salary,is_active
1,Alice,30,70000.0,true
2,Bob,twenty-five,55000.5,false
3,Charlie,28,"sixty thousand",yes
4,David,35,60000.0,true
"""

with open("schema_mismatch.csv", "w") as f:
    f.write(csv_data)

# Load the data with automatic schema inference (may be wrong)
raw_df = spark.read.csv("schema_mismatch.csv", header=True, inferSchema=True)

print("🚫 Raw Data with Schema Mismatches:")
raw_df.show(truncate=False)
raw_df.printSchema()

# -----------------------------------------------------------------------------
# Step 3: Define the expected schema
# -----------------------------------------------------------------------------
expected_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("salary", FloatType(), True),
    StructField("is_active", BooleanType(), True)
])

# -----------------------------------------------------------------------------
# Step 4: Handle schema mismatches (Transform types to match expected schema)
# -----------------------------------------------------------------------------
# Cast columns to match expected schema
corrected_df = raw_df \
    .withColumn("id", col("id").cast("int")) \
    .withColumn("name", col("name").cast("string")) \
    .withColumn("age", col("age").cast("int")) \
    .withColumn("salary", col("salary").cast("float")) \
    .withColumn("is_active", col("is_active").cast("boolean"))

# -----------------------------------------------------------------------------
# Step 5: Show corrected data
# -----------------------------------------------------------------------------
print("✅ Corrected DataFrame:")
corrected_df.show(truncate=False)
corrected_df.printSchema()

# Optional: Handle invalid rows (e.g., nulls after cast)
invalid_rows = corrected_df.filter(
    col("age").isNull() | col("salary").isNull() | col("is_active").isNull()
)

print("⚠️ Rows with unresolved schema mismatches (nulls after casting):")
invalid_rows.show(truncate=False)

# -----------------------------------------------------------------------------
# Stop Spark session
# -----------------------------------------------------------------------------
spark.stop()


JAVA_HOME is not set


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

### Task 2: Detect and Correct Incomplete Data in ETL
**Description**: Use Python and Pandas to detect incomplete data in an ETL process and fill
missing values with estimates.

**Steps**:
1. Detect incomplete data
2. Fill missing values
3. Report changes

In [None]:
# etl_detect_and_correct.py

import pandas as pd
import numpy as np

# -----------------------------------------------------------------------------
# Step 0: Create a sample CSV file with incomplete data
# -----------------------------------------------------------------------------
sample_csv = """\
OrderID,Customer,Quantity,UnitPrice,Region
1001,Alice,5,20.0,North
1002,Bob,,25.0,East
1003,Charlie,3,,West
1004,,2,15.0,South
1005,Eve,7,22.5,
"""

with open('etl_incomplete.csv', 'w') as f:
    f.write(sample_csv)

# -----------------------------------------------------------------------------
# Step 1: Load data and detect incomplete entries
# -----------------------------------------------------------------------------
df = pd.read_csv('etl_incomplete.csv')
print("📥 Loaded Data:")
print(df, "\n")

missing_counts = df.isnull().sum()
print("❗ Missing values per column:")
print(missing_counts, "\n")

# -----------------------------------------------------------------------------
# Step 2: Fill missing values with estimates
# -----------------------------------------------------------------------------
# We'll compute fill-values once so we can report them:
fills = {}

# Numeric: Quantity → fill with median
median_qty = df['Quantity'].median()
fills['Quantity'] = median_qty
df['Quantity'] = df['Quantity'].fillna(median_qty)

# Numeric: UnitPrice → fill with mean
mean_price = df['UnitPrice'].mean()
fills['UnitPrice'] = mean_price
df['UnitPrice'] = df['UnitPrice'].fillna(mean_price)

# Categorical: Customer → fill with a placeholder "Unknown"
fills['Customer'] = "Unknown"
df['Customer'] = df['Customer'].fillna("Unknown")

# Categorical: Region → fill with mode (most frequent), or "Unknown" if tie/NaN
if df['Region'].mode().size > 0:
    mode_region = df['Region'].mode()[0]
else:
    mode_region = "Unknown"
fills['Region'] = mode_region
df['Region'] = df['Region'].fillna(mode_region)

# -----------------------------------------------------------------------------
# Step 3: Report exactly which cells were imputed
# -----------------------------------------------------------------------------
print("🛠️ Imputation summary:")
for col, fill_val in fills.items():
    # find original null positions by re-reading
    orig = pd.read_csv('etl_incomplete.csv')[col]
    null_idxs = orig[orig.isnull()].index.tolist()
    if null_idxs:
        print(f" • Column '{col}': filled {len(null_idxs)} missing value(s) at rows {null_idxs} with → {fill_val}")
    else:
        print(f" • Column '{col}': no missing values to fill")

# -----------------------------------------------------------------------------
# Show the corrected DataFrame
# -----------------------------------------------------------------------------
print("\n✅ Corrected DataFrame:")
print(df)

# Optionally, save the cleaned DataFrame
df.to_csv('etl_incomplete_cleaned.csv', index=False)
print("\n💾 Cleaned data saved to 'etl_incomplete_cleaned.csv'")


ModuleNotFoundError: No module named 'pyspark'