### Task 1: Handling Schema Mismatches using Spark
**Description**: Use Apache Spark to address schema mismatches by transforming data to match
the expected schema.

**Steps**:
1. Create Spark session
2. Load dataframe
3. Define the expected schema
4. Handle schema mismatches
5. Show corrected data

In [4]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, lit

# Step 1: Create Spark Session
spark = SparkSession.builder \
    .appName("SchemaMismatchHandling") \
    .getOrCreate()

# Step 2: Load DataFrame with mismatched schema
# - 'age' is a string instead of int
# - 'gender' is missing
# - 'country' is an extra column
data = [
    Row(name="Alice", age="30", country="USA"),
    Row(name="Bob", age="40", country="UK"),
    Row(name="Carol", age=None, country="India")
]

df = spark.createDataFrame(data)

print("Original DataFrame:")
df.show()
df.printSchema()

# Step 3: Define expected schema
expected_schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True)
])

# Step 4: Handle schema mismatches
# - Cast 'age' to Integer
# - Add 'gender' if missing
# - Select only expected columns

# Cast 'age' to Integer
df = df.withColumn("age", col("age").cast(IntegerType()))

# Add missing column 'gender' if not present
if 'gender' not in df.columns:
    df = df.withColumn("gender", lit(None).cast(StringType()))

# Keep only expected columns
expected_columns = [f.name for f in expected_schema.fields]
df = df.select(*expected_columns)

# Step 5: Show corrected DataFrame
print("Corrected DataFrame:")
df.show()
df.printSchema()


JAVA_HOME is not set


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [3]:
!pip install pyspark


Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark
  Downloading pyspark-4.0.0.tar.gz (434.1 MB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m434.1/434.1 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.9
  Downloading py4j-0.10.9.9-py2.py3-none-any.whl (203 kB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m203.0/203.0 kB[0m [31m30.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-4.0.0-py2.py3-none-any.whl size=434741245 sha256=837d8234ed09cbd7d82252082ed0cc4f88d2cf6071a53e7edb5bc8bc0b7d9450
  Stored in directory: /h

### Task 2: Detect and Correct Incomplete Data in ETL
**Description**: Use Python and Pandas to detect incomplete data in an ETL process and fill
missing values with estimates.

**Steps**:
1. Detect incomplete data
2. Fill missing values
3. Report changes

In [5]:
# Write your code from here
import pandas as pd
import numpy as np

# Step 1: Create a Sample Dataset with Missing Values
data = {
    'EmployeeID': [101, 102, 103, 104, 105],
    'Name': ['Alice', 'Bob', 'Charlie', 'David', None],
    'Age': [25, None, 30, 28, None],
    'Department': ['HR', 'IT', None, 'Finance', 'HR'],
    'Salary': [50000, 60000, 55000, None, 52000]
}

df = pd.DataFrame(data)
print("üîç Original DataFrame with Missing Values:\n")
print(df)

# Step 2: Detect Incomplete Data
print("\nüìä Missing Values Count:\n")
print(df.isnull().sum())

# Step 3: Fill Missing Values
# Fill Age with median
df['Age'].fillna(df['Age'].median(), inplace=True)

# Fill Salary with mean
df['Salary'].fillna(df['Salary'].mean(), inplace=True)

# Fill Department and Name with mode
df['Department'].fillna(df['Department'].mode()[0], inplace=True)
df['Name'].fillna('Unknown', inplace=True)

# Step 4: Report Changes
print("\n‚úÖ Cleaned DataFrame After Filling Missing Values:\n")
print(df)

# Optional: Show what values were changed
changes_report = df.isnull().sum()
print("\nüìå Summary: Missing Values After Cleaning (should be 0):\n")
print(changes_report)


üîç Original DataFrame with Missing Values:

   EmployeeID     Name   Age Department   Salary
0         101    Alice  25.0         HR  50000.0
1         102      Bob   NaN         IT  60000.0
2         103  Charlie  30.0       None  55000.0
3         104    David  28.0    Finance      NaN
4         105     None   NaN         HR  52000.0

üìä Missing Values Count:

EmployeeID    0
Name          1
Age           2
Department    1
Salary        1
dtype: int64

‚úÖ Cleaned DataFrame After Filling Missing Values:

   EmployeeID     Name   Age Department   Salary
0         101    Alice  25.0         HR  50000.0
1         102      Bob  28.0         IT  60000.0
2         103  Charlie  30.0         HR  55000.0
3         104    David  28.0    Finance  54250.0
4         105  Unknown  28.0         HR  52000.0

üìå Summary: Missing Values After Cleaning (should be 0):

EmployeeID    0
Name          0
Age           0
Department    0
Salary        0
dtype: int64
