# PySpark

## Installing PySpark and OpenJDK and Setting Up Environment

In [1]:
# Install PySpark
# !pip install pyspark==3.5.0 -q
# Install Java (Spark requires Java)
# !apt-get install openjdk-11-jdk-headless -qq > /dev/null
# Set Java environment variable
import os, sys

os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jdk-17" # For windows, for mac keep commented
print("PySpark and Java installed successfully!")
from pyspark.sql import SparkSession
from pyspark import SparkConf

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
print(sys.executable)

# Create a SparkSession configured for data preprocessing
spark = (
    SparkSession.builder.appName("SCC454-DataPreprocessing")
    .config("spark.driver.memory", "4g")
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY")
    .config("spark.ui.port", "4050")
    .getOrCreate()
)
# Get the underlying SparkContext
sc = spark.sparkContext
print(f"Spark Version: {spark.version}")
print(f"Application Name: {spark.sparkContext.appName}")
print(f"Master: {spark.sparkContext.master}")
print("\nSpark Session ready for data preprocessing!")

PySpark and Java installed successfully!
d:\Lancaster University Coursework\Term 2\SSC 454 - Large scale platforms for AI and Data Analysis\Labs\venv\Scripts\python.exe
Spark Version: 3.5.0
Application Name: SCC454-DataPreprocessing
Master: local[*]

Spark Session ready for data preprocessing!


## Spark Data Cleaning Capacities

In [2]:
# Import commonly used functions for data preprocessing
from pyspark.sql.functions import (
    col,
    lit,
    isnan,
    isnull,
    coalesce,
    lower,
    upper,
    trim,
    ltrim,
    rtrim,
    length,
    split,
    concat,
    concat_ws,
    substring,
    replace,
    regexp_extract,
    regexp_replace,
    to_date,
    to_timestamp,
    count,
    avg,
    sum as spark_sum,
    mean,
    stddev,
    min as spark_min,
    max as spark_max,
    when,
    expr,
    year,
    month,
    dayofmonth,
    datediff,
    date_format,
    monotonically_increasing_id,
    round as spark_round,
)
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    FloatType,
    DoubleType,
    DateType,
    TimestampType,
    BooleanType,
)

print("All preprocessing functions imported successfully!")

All preprocessing functions imported successfully!


## Handling Missing Data

In [3]:
# Create a DataFrame with various missing value patterns
customer_data = [
    (1, "John Smith", 28, "john@email.com", 75000.0, "New York", "2023-01-15"),
    (2, "Jane Doe", None, "jane@email.com", 82000.0, "Los Angeles", "2023-02-20"),
    (3, "Bob Wilson", 35, None, 65000.0, "Chicago", "2023-03-10"),
    (4, "Alice Brown", 42, "alice@email.com", None, "Houston", None),
    (5, None, 31, "unknown@email.com", 70000.0, "Phoenix", "2023-05-01"),
    (6, "Charlie Lee", 29, "", 58000.0, "", "2023-06-15"),
    (7, "Diana Prince", None, "diana@email.com", 95000.0, None, "2023-07-20"),
    (8, "Edward Kim", 38, "N/A", "N/A", "Boston", "2023-08-25"),
    (9, "Fiona Garcia", 45, "fiona@email.com", 88000.0, "Seattle", "2023-09-30"),
    (10, "George Hall", None, None, None, None, None),
    (11, "Hannah White", 33, "hannah@email.com", 72000.0, "Denver", "2023-11-10"),
    (12, "Ivan Torres", 27, "ivan@email.com", 63000.0, "Miami", "2023-12-05"),
]
columns = ["customer_id", "name", "age", "email", "salary", "city", "signup_date"]
df_customers = spark.createDataFrame(customer_data, columns)
print("Customer Data with Missing Values:")
df_customers.show(truncate=False)
# Check the schema
print("Schema:")
df_customers.printSchema()


Customer Data with Missing Values:
+-----------+------------+----+-----------------+-------+-----------+-----------+
|customer_id|name        |age |email            |salary |city       |signup_date|
+-----------+------------+----+-----------------+-------+-----------+-----------+
|1          |John Smith  |28  |john@email.com   |75000.0|New York   |2023-01-15 |
|2          |Jane Doe    |NULL|jane@email.com   |82000.0|Los Angeles|2023-02-20 |
|3          |Bob Wilson  |35  |NULL             |65000.0|Chicago    |2023-03-10 |
|4          |Alice Brown |42  |alice@email.com  |NULL   |Houston    |NULL       |
|5          |NULL        |31  |unknown@email.com|70000.0|Phoenix    |2023-05-01 |
|6          |Charlie Lee |29  |                 |58000.0|           |2023-06-15 |
|7          |Diana Prince|NULL|diana@email.com  |95000.0|NULL       |2023-07-20 |
|8          |Edward Kim  |38  |N/A              |N/A    |Boston     |2023-08-25 |
|9          |Fiona Garcia|45  |fiona@email.com  |88000.0|Seattl

### Count Missing Value

In [4]:
# Method 1: Count nulls per column
print("Null counts per column:")
null_counts = df_customers.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in df_customers.columns]
)
null_counts.show()

# Method 2: Count nulls AND NaN values (important for numeric columns)
print("Null + NaN counts for numeric columns:")
df_customers.select(
    [
        count(when(col("salary").isNull() | isnan(col("salary")), 1)).alias(
            "salary_missing"
        )
    ]
).show()


# Method 3: Comprehensive missing value report
def missing_value_report(df):
    total_rows = df.count()
    report_data = []
    for col_name in df.columns:
        col_type = str(df.schema[col_name].dataType)
    null_count = df.filter(col(col_name).isNull()).count()
    empty_count = 0
    if "String" in col_type:
        empty_count = df.filter(
            (col(col_name) == "")
            | (col(col_name) == "N/A")
            | (col(col_name) == "Unknown")
        ).count()
    nan_count = 0
    if "Double" in col_type or "Float" in col_type:
        nan_count = df.filter(isnan(col(col_name))).count()
    total_missing = null_count + empty_count + nan_count
    missing_pct = (total_missing / total_rows) * 100
    report_data.append(
        (
            col_name,
            col_type,
            null_count,
            empty_count,
            nan_count,
            total_missing,
            round(missing_pct, 2),
        )
    )
    report_columns = [
        "Column",
        "Type",
        "Nulls",
        "Empty",
        "NaN",
        "Total_Missing",
        "Missing_%",
    ]
    return spark.createDataFrame(report_data, report_columns)


print("=" * 60)
print("COMPREHENSIVE MISSING VALUE REPORT")
print("=" * 60)
report = missing_value_report(df_customers)
report.show(truncate=False)

Null counts per column:
+-----------+----+---+-----+------+----+-----------+
|customer_id|name|age|email|salary|city|signup_date|
+-----------+----+---+-----+------+----+-----------+
|          0|   1|  3|    2|     2|   2|          2|
+-----------+----+---+-----+------+----+-----------+

Null + NaN counts for numeric columns:
+--------------+
|salary_missing|
+--------------+
|             2|
+--------------+

COMPREHENSIVE MISSING VALUE REPORT
+-----------+------------+-----+-----+---+-------------+---------+
|Column     |Type        |Nulls|Empty|NaN|Total_Missing|Missing_%|
+-----------+------------+-----+-----+---+-------------+---------+
|signup_date|StringType()|2    |0    |0  |2            |16.67    |
+-----------+------------+-----+-----+---+-------------+---------+



### Drop Null Values

In [5]:
# Original count
print(f"Original row count: {df_customers.count()}")

# Drop rows with ANY null value (most aggressive)
df_drop_any = df_customers.na.drop(how="any")
print(f"After dropping rows with ANY null: {df_drop_any.count()}")
df_drop_any.show()

# Drop rows where ALL values are null (least aggressive)
df_drop_all = df_customers.na.drop(how="all")
print(f"After dropping rows where ALL are null:{df_drop_all.count()}")

# Drop rows where specific columns are null (most practical)
df_drop_subset = df_customers.na.drop(subset=["name", "email"])
print(f"After dropping rows where name OR email is null:{df_drop_subset.count()}")
df_drop_subset.show()

# Drop rows with fewer than a threshold of non-null values
df_drop_thresh = df_customers.na.drop(thresh=5)
print(f"After dropping rows with < 5 non-null values:{df_drop_thresh.count()}")
df_drop_thresh.show()

Original row count: 12
After dropping rows with ANY null: 6
+-----------+------------+---+----------------+-------+--------+-----------+
|customer_id|        name|age|           email| salary|    city|signup_date|
+-----------+------------+---+----------------+-------+--------+-----------+
|          1|  John Smith| 28|  john@email.com|75000.0|New York| 2023-01-15|
|          6| Charlie Lee| 29|                |58000.0|        | 2023-06-15|
|          8|  Edward Kim| 38|             N/A|    N/A|  Boston| 2023-08-25|
|          9|Fiona Garcia| 45| fiona@email.com|88000.0| Seattle| 2023-09-30|
|         11|Hannah White| 33|hannah@email.com|72000.0|  Denver| 2023-11-10|
|         12| Ivan Torres| 27|  ivan@email.com|63000.0|   Miami| 2023-12-05|
+-----------+------------+---+----------------+-------+--------+-----------+

After dropping rows where ALL are null:12
After dropping rows where name OR email is null:9
+-----------+------------+----+----------------+-------+-----------+---------

### Filling Missing Values (Imputation)

In [6]:
# Fill all numeric columns with a single value
print("Fill all numeric nulls with 0:")
df_customers.na.fill(0).show()

# Fill specific columns with specific values (dictionary approach)
fill_values = {
    "name": "Unknown",
    "age": 30,
    "email": "no_email@placeholder.com",
    "salary": 0.0,
    "city": "Unknown",
    "signup_date": "1970-01-01",
}
print("Fill with specific values per column:")
df_filled = df_customers.na.fill(fill_values)
df_filled.show(truncate=False)

Fill all numeric nulls with 0:
+-----------+------------+---+-----------------+-------+-----------+-----------+
|customer_id|        name|age|            email| salary|       city|signup_date|
+-----------+------------+---+-----------------+-------+-----------+-----------+
|          1|  John Smith| 28|   john@email.com|75000.0|   New York| 2023-01-15|
|          2|    Jane Doe|  0|   jane@email.com|82000.0|Los Angeles| 2023-02-20|
|          3|  Bob Wilson| 35|             NULL|65000.0|    Chicago| 2023-03-10|
|          4| Alice Brown| 42|  alice@email.com|   NULL|    Houston|       NULL|
|          5|        NULL| 31|unknown@email.com|70000.0|    Phoenix| 2023-05-01|
|          6| Charlie Lee| 29|                 |58000.0|           | 2023-06-15|
|          7|Diana Prince|  0|  diana@email.com|95000.0|       NULL| 2023-07-20|
|          8|  Edward Kim| 38|              N/A|    N/A|     Boston| 2023-08-25|
|          9|Fiona Garcia| 45|  fiona@email.com|88000.0|    Seattle| 2023-09-3

#### Statistical Imputation

In [8]:
# Calculate mean for age column (excluding nulls)
age_stats = df_customers.select(
    mean(col("age")).alias("mean_age"),
    expr("percentile_approx(age, 0.5)").alias("median_age"),
).collect()[0]
mean_age = age_stats["mean_age"]
median_age = age_stats["median_age"]
print(f"Mean age: {mean_age:.2f}")
print(f"Median age: {median_age}")

# Fill with mean
df_mean_imputed = df_customers.na.fill({"age": int(mean_age)})
print("\nAge filled with mean:")
df_mean_imputed.select("customer_id", "name", "age").show()

# Calculate and impute with median for salary
salary_stats = (
    df_customers.filter(col("salary").isNotNull() & ~isnan(col("salary")))
    .select(
        mean(col("salary")).alias("mean_salary"),
        expr("percentile_approx(salary, 0.5)").alias("median_salary"),
    )
    .collect()[0]
)
print(f"Mean salary: ${salary_stats['mean_salary']:,.2f}")
print(f"Median salary: ${salary_stats['median_salary']:,.2f}")

Mean age: 34.22
Median age: 33

Age filled with mean:
+-----------+------------+---+
|customer_id|        name|age|
+-----------+------------+---+
|          1|  John Smith| 28|
|          2|    Jane Doe| 34|
|          3|  Bob Wilson| 35|
|          4| Alice Brown| 42|
|          5|        NULL| 31|
|          6| Charlie Lee| 29|
|          7|Diana Prince| 34|
|          8|  Edward Kim| 38|
|          9|Fiona Garcia| 45|
|         10| George Hall| 34|
|         11|Hannah White| 33|
|         12| Ivan Torres| 27|
+-----------+------------+---+

Mean salary: $74,222.22
Median salary: $72,000.00


### Advanced Imputation Techniques

In [9]:
# Group-wise imputation: Fill missing values based on group statistics
from pyspark.sql.window import Window

# Calculate average salary per city
city_avg_salary = (
    df_customers.filter(
        col("salary").isNotNull() & ~isnan(col("salary")) & col("city").isNotNull()
    )
    .groupBy("city")
    .agg(spark_round(mean("salary"), 2).alias("city_avg_salary"))
)
print("Average salary per city:")
city_avg_salary.show()

# Using coalesce for conditional filling
overall_avg_salary = salary_stats["mean_salary"]
df_smart_imputed = (
    df_customers.join(city_avg_salary, on="city", how="left")
    .withColumn(
        "salary_imputed",
        coalesce(
            when(~isnan(col("salary")), col("salary")),
            col("city_avg_salary"),
            lit(overall_avg_salary),
        ),
    )
    .select(
        "customer_id", "name", "city", "salary", "city_avg_salary", "salary_imputed"
    )
)
print("Smart imputation using city averages:")
df_smart_imputed.show()

# Forward Fill using Window functions (useful for time-series)
from pyspark.sql.functions import last, first

ts_data = [
    ("2024-01-01", 100.0),
    ("2024-01-02", None),
    ("2024-01-03", None),
    ("2024-01-04", 105.0),
    ("2024-01-05", None),
    ("2024-01-06", 108.0),
    ("2024-01-07", None),
]
df_ts = spark.createDataFrame(ts_data, ["date", "value"])
print("Time series with missing values:")
df_ts.show()

# Forward fill
window_forward = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, 0)
df_forward_fill = df_ts.withColumn(
    "value_ffill", last(col("value"), ignorenulls=True).over(window_forward)
)
print("After forward fill:")
df_forward_fill.show()

# Creating missing value indicator columns
df_with_indicators = df_customers.withColumn(
    "age_was_missing", when(col("age").isNull(), 1).otherwise(0)
).withColumn(
    "salary_was_missing",
    when(col("salary").isNull() | isnan(col("salary")), 1).otherwise(0),
)
print("Data with missing value indicators:")
df_with_indicators.select(
    "customer_id", "name", "age", "age_was_missing", "salary", "salary_was_missing"
).show()

Average salary per city:
+-----------+---------------+
|       city|city_avg_salary|
+-----------+---------------+
|   New York|        75000.0|
|Los Angeles|        82000.0|
|    Chicago|        65000.0|
|    Phoenix|        70000.0|
|           |        58000.0|
|     Boston|           NULL|
|    Seattle|        88000.0|
|     Denver|        72000.0|
|      Miami|        63000.0|
+-----------+---------------+

Smart imputation using city averages:
+-----------+------------+-----------+-------+---------------+-----------------+
|customer_id|        name|       city| salary|city_avg_salary|   salary_imputed|
+-----------+------------+-----------+-------+---------------+-----------------+
|          1|  John Smith|   New York|75000.0|        75000.0|          75000.0|
|          2|    Jane Doe|Los Angeles|82000.0|        82000.0|          82000.0|
|          3|  Bob Wilson|    Chicago|65000.0|        65000.0|          65000.0|
|          4| Alice Brown|    Houston|   NULL|           NUL

## Understanding Spark Data Types

In [10]:
# Create a DataFrame with mixed type issues
messy_data = [
    ("1", "25", "50000.50", "2024-01-15", "true"),
    ("2", "thirty", "60000", "15/02/2024", "yes"),
    ("3", "35", "$75,000", "2024-03-20", "1"),
    ("4", "40", "80000.00", "March 25, 2024", "false"),
    ("5", "N/A", "invalid", "2024-04-30", "no"),
]
df_messy = spark.createDataFrame(messy_data, ["id", "age", "salary", "date", "active"])
print("Messy data with type issues:")
df_messy.show()
df_messy.printSchema()

Messy data with type issues:
+---+------+--------+--------------+------+
| id|   age|  salary|          date|active|
+---+------+--------+--------------+------+
|  1|    25|50000.50|    2024-01-15|  true|
|  2|thirty|   60000|    15/02/2024|   yes|
|  3|    35| $75,000|    2024-03-20|     1|
|  4|    40|80000.00|March 25, 2024| false|
|  5|   N/A| invalid|    2024-04-30|    no|
+---+------+--------+--------------+------+

root
 |-- id: string (nullable = true)
 |-- age: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- date: string (nullable = true)
 |-- active: string (nullable = true)



### Type Casting and Conversion

In [None]:
# Simple casting
df_cast = df_messy.withColumn("id_int", col("id").cast(IntegerType()))
print("ID cast to integer:")
df_cast.select("id", "id_int").show()

# Casting with potential failures - non-numeric strings become null
df_age_cast = df_messy.withColumn("age_int", col("age").cast(IntegerType()))
print("Age cast to integer (note nulls for invalid values):")
df_age_cast.select("age", "age_int").show()

# Clean salary string before casting
df_salary_clean = df_messy.withColumn(
    "salary_cleaned", regexp_replace(col("salary"), "[$,]", "")
).withColumn("salary_double", col("salary_cleaned").cast(DoubleType()))
print("Salary cleaning and casting:")
df_salary_clean.select("salary", "salary_cleaned", "salary_double").show()

# Boolean conversion with multiple representations
df_bool = df_messy.withColumn(
    "active_bool",
    when(lower(col("active")).isin("true", "yes", "1", "t", "y"), True)
    .when(lower(col("active")).isin("false", "no", "0", "f", "n"), False)
    .otherwise(None),
)
print("Boolean conversion:")
df_bool.select("active", "active_bool").show()

ID cast to integer:
+---+------+
| id|id_int|
+---+------+
|  1|     1|
|  2|     2|
|  3|     3|
|  4|     4|
|  5|     5|
+---+------+

Age cast to integer (note nulls for invalid values):
+------+-------+
|   age|age_int|
+------+-------+
|    25|     25|
|thirty|   NULL|
|    35|     35|
|    40|     40|
|   N/A|   NULL|
+------+-------+

Salary cleaning and casting:
+--------+--------------+-------------+
|  salary|salary_cleaned|salary_double|
+--------+--------------+-------------+
|50000.50|      50000.50|      50000.5|
|   60000|         60000|      60000.0|
| $75,000|         75000|      75000.0|
|80000.00|      80000.00|      80000.0|
| invalid|       invalid|         NULL|
+--------+--------------+-------------+

Boolean conversion:
+------+-----------+
|active|active_bool|
+------+-----------+
|  true|       true|
|   yes|       true|
|     1|       true|
| false|      false|
|    no|      false|
+------+-----------+



### Handling Date and Timestamp Fields

In [None]:
# Common date formats
date_examples = [
    ("2024-01-15", "ISO format"),
    ("15/02/2024", "European DD/MM/YYYY"),
    ("03/20/2024", "American MM/DD/YYYY"),
    ("March 25, 2024", "Written format"),
    ("2024.04.30", "Dot separator"),
]
df_dates = spark.createDataFrame(date_examples, ["date_string", "format_name"])
df_dates.show(truncate=False)

# Parse dates with multiple format patterns using coalesce
df_parsed = df_dates.withColumn(
    "parsed_date",
    coalesce(
        to_date(col("date_string"), "yyyy-MM-dd"),
        to_date(col("date_string"), "dd/MM/yyyy"),
        to_date(col("date_string"), "MM/dd/yyyy"),
        to_date(col("date_string"), "MMMM dd, yyyy"),
        to_date(col("date_string"), "yyyy.MM.dd"),
    ),
)
print("Dates parsed from multiple formats:")
df_parsed.show(truncate=False)

# Extract components from dates
from pyspark.sql.functions import (
    year,
    month,
    dayofmonth,
    dayofweek,
    quarter,
    date_format,
)

df_date_parts = df_parsed.filter(col("parsed_date").isNotNull()).select(
    col("date_string"),
    col("parsed_date"),
    year(col("parsed_date")).alias("year"),
    month(col("parsed_date")).alias("month"),
    dayofmonth(col("parsed_date")).alias("day"),
    quarter(col("parsed_date")).alias("quarter"),
    date_format(col("parsed_date"), "EEEE").alias("day_name"),
)
print("Date components extracted:")
df_date_parts.show(truncate=False)

+--------------+-------------------+
|date_string   |format_name        |
+--------------+-------------------+
|2024-01-15    |ISO format         |
|15/02/2024    |European DD/MM/YYYY|
|03/20/2024    |American MM/DD/YYYY|
|March 25, 2024|Written format     |
|2024.04.30    |Dot separator      |
+--------------+-------------------+

Dates parsed from multiple formats:
+--------------+-------------------+-----------+
|date_string   |format_name        |parsed_date|
+--------------+-------------------+-----------+
|2024-01-15    |ISO format         |2024-01-15 |
|15/02/2024    |European DD/MM/YYYY|2024-02-15 |
|03/20/2024    |American MM/DD/YYYY|2024-03-20 |
|March 25, 2024|Written format     |2024-03-25 |
|2024.04.30    |Dot separator      |2024-04-30 |
+--------------+-------------------+-----------+

Date components extracted:
+--------------+-----------+----+-----+---+-------+---------+
|date_string   |parsed_date|year|month|day|quarter|day_name |
+--------------+-----------+----+----

### Schema Validation and Enforcement

In [None]:
# Define an expected schema
expected_schema = StructType(
    [
        StructField("customer_id", IntegerType(), nullable=False),
        StructField("name", StringType(), nullable=True),
        StructField("age", IntegerType(), nullable=True),
        StructField("email", StringType(), nullable=True),
        StructField("salary", DoubleType(), nullable=True),
        StructField("city", StringType(), nullable=True),
        StructField("signup_date", DateType(), nullable=True),
    ]
)
print("Expected Schema:")
for field in expected_schema.fields:
    print(f" {field.name}: {field.dataType}(nullable={field.nullable})")


# Function to validate and transform data to match expected schema
def enforce_schema(df, target_schema):
    result_df = df
    for field in target_schema.fields:
        col_name = field.name
    col_type = field.dataType
    if col_name in df.columns:
        result_df = result_df.withColumn(col_name, col(col_name).cast(col_type))
    else:
        result_df = result_df.withColumn(col_name, lit(None).cast(col_type))
    return result_df.select([field.name for field in target_schema.fields])


df_enforced = enforce_schema(df_customers, expected_schema)
print("Schema-enforced DataFrame:")
df_enforced.printSchema()
df_enforced.show(5)

Expected Schema:
 customer_id: IntegerType()(nullable=False)
 name: StringType()(nullable=True)
 age: IntegerType()(nullable=True)
 email: StringType()(nullable=True)
 salary: DoubleType()(nullable=True)
 city: StringType()(nullable=True)
 signup_date: DateType()(nullable=True)
Schema-enforced DataFrame:
root
 |-- customer_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- email: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- city: string (nullable = true)
 |-- signup_date: date (nullable = true)

+-----------+-----------+----+-----------------+-------+-----------+-----------+
|customer_id|       name| age|            email| salary|       city|signup_date|
+-----------+-----------+----+-----------------+-------+-----------+-----------+
|          1| John Smith|  28|   john@email.com|75000.0|   New York| 2023-01-15|
|          2|   Jane Doe|NULL|   jane@email.com|82000.0|Los Angeles| 2023-02-20|
|          3| Bob Wil

## String Functions in Spark

In [None]:
# Create sample text data
text_data = [
    (1, " Hello World ", "john.doe@email.com"),
    (2, "APACHE SPARK", "JANE.DOE@EMAIL.COM"),
    (3, "data science", "bob_wilson@company.co.uk"),
    (4, " Machine Learning ", "alice-brown@domain.org"),
    (5, "NLP is Great!!!", "charlie.lee123@test.com"),
]
df_text = spark.createDataFrame(text_data, ["id", "text", "email"])
print("Sample text data:")
df_text.show(truncate=False)

Sample text data:
+---+------------------+------------------------+
|id |text              |email                   |
+---+------------------+------------------------+
|1  | Hello World      |john.doe@email.com      |
|2  |APACHE SPARK      |JANE.DOE@EMAIL.COM      |
|3  |data science      |bob_wilson@company.co.uk|
|4  | Machine Learning |alice-brown@domain.org  |
|5  |NLP is Great!!!   |charlie.lee123@test.com |
+---+------------------+------------------------+



### Case Normalization and Trimming

In [None]:
# Case conversion
df_case = df_text.select(
    col("text"),
    lower(col("text")).alias("lowercase"),
    upper(col("text")).alias("uppercase"),
    expr("initcap(text)").alias("titlecase"),
)
print("Case conversion:")
df_case.show(truncate=False)

# Trimming whitespace
df_trim = df_text.select(
    col("text"),
    trim(col("text")).alias("trimmed"),
    ltrim(col("text")).alias("left_trimmed"),
    rtrim(col("text")).alias("right_trimmed"),
    length(col("text")).alias("original_len"),
    length(trim(col("text"))).alias("trimmed_len"),
)
print("Whitespace trimming:")
df_trim.show(truncate=False)


Case conversion:
+------------------+------------------+------------------+------------------+
|text              |lowercase         |uppercase         |titlecase         |
+------------------+------------------+------------------+------------------+
| Hello World      | hello world      | HELLO WORLD      | Hello World      |
|APACHE SPARK      |apache spark      |APACHE SPARK      |Apache Spark      |
|data science      |data science      |DATA SCIENCE      |Data Science      |
| Machine Learning | machine learning | MACHINE LEARNING | Machine Learning |
|NLP is Great!!!   |nlp is great!!!   |NLP IS GREAT!!!   |Nlp Is Great!!!   |
+------------------+------------------+------------------+------------------+

Whitespace trimming:
+------------------+----------------+-----------------+-----------------+------------+-----------+
|text              |trimmed         |left_trimmed     |right_trimmed    |original_len|trimmed_len|
+------------------+----------------+-----------------+------

### Tokenization and Text Splitting

In [None]:
# Create sample sentences
sentences_data = [
    (1, "Apache Spark is a unified analytics engine for big data processing"),
    (2, "Natural language processing enables computers to understand text"),
    (3, "Machine learning models require clean preprocessed data"),
    (4, "Text mining extracts valuable insights from unstructured data"),
]
df_sentences = spark.createDataFrame(sentences_data, ["id", "sentence"])
df_sentences.show(truncate=False)

# Basic tokenization using split
from pyspark.sql.functions import split, size, explode

df_tokens = df_sentences \
    .withColumn("words", split(lower(col("sentence")), " ")) \
    .withColumn("word_count", size(col("words")))
print("Tokenized sentences:")
df_tokens.show(truncate=False)

# Explode tokens and calculate word frequency
df_exploded = df_tokens.select(col("id"), explode(col("words")).alias("word"))
word_freq = (
    df_exploded.groupBy("word")
    .agg(count("*").alias("frequency"))
    .orderBy(col("frequency").desc())
)
print("Word frequencies:")
word_freq.show(15)

+---+------------------------------------------------------------------+
|id |sentence                                                          |
+---+------------------------------------------------------------------+
|1  |Apache Spark is a unified analytics engine for big data processing|
|2  |Natural language processing enables computers to understand text  |
|3  |Machine learning models require clean preprocessed data           |
|4  |Text mining extracts valuable insights from unstructured data     |
+---+------------------------------------------------------------------+

Tokenized sentences:
+---+------------------------------------------------------------------+------------------------------------------------------------------------------+----------+
|id |sentence                                                          |words                                                                         |word_count|
+---+---------------------------------------------------------------

### Stop Word Removal

In [None]:
# Define common English stop words
stop_words = [
    "a",
    "an",
    "the",
    "and",
    "or",
    "but",
    "is",
    "are",
    "was",
    "were",
    "to",
    "of",
    "in",
    "for",
    "on",
    "with",
    "at",
    "by",
    "from",
    "it",
    "its",
    "this",
    "that",
    "these",
    "those",
    "be",
    "been",
    "being",
    "have",
    "has",
    "had",
]
from pyspark.sql.functions import array_except, array

stop_words_array = array([lit(w) for w in stop_words])
df_no_stopwords = df_tokens.withColumn(
    "words_filtered", array_except(col("words"), stop_words_array)
)
print("Words with stop words removed:")
df_no_stopwords.select("sentence", "words", "words_filtered").show(truncate=False)

# Using Spark ML's StopWordsRemover
from pyspark.ml.feature import StopWordsRemover, Tokenizer

tokenizer = Tokenizer(inputCol="sentence", outputCol="words_ml")
df_tokenized_ml = tokenizer.transform(df_sentences)
remover = StopWordsRemover(inputCol="words_ml", outputCol="words_cleaned")
df_cleaned_ml = remover.transform(df_tokenized_ml)
print("Using Spark ML StopWordsRemover:")
df_cleaned_ml.select("sentence", "words_ml", "words_cleaned").show(truncate=False)

Words with stop words removed:
+------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------+
|sentence                                                          |words                                                                         |words_filtered                                                       |
+------------------------------------------------------------------+------------------------------------------------------------------------------+---------------------------------------------------------------------+
|Apache Spark is a unified analytics engine for big data processing|[apache, spark, is, a, unified, analytics, engine, for, big, data, processing]|[apache, spark, unified, analytics, engine, big, data, processing]   |
|Natural language processing enables computers to understand text  |[natural, language, processin

### Text Cleaning Pipeline

In [None]:
# Sample product reviews (messy text data)
reviews_data = [
    (1, " GREAT product!!! Would buy again. ", 5),
    (2, "Terrible... don't waste your money ", 1),
    (3, "Good quality, fast shipping!!!", 4),
    (4, " not bad, could be better... ", 3),
    (5, "AMAZING!!!!! Best purchase EVER!!!!!", 5),
    (6, "meh, it's OK I guess", 3),
    (7, "Product arrived damaged", 1),
    (8, "10/10 would recommend to friends!", 5),
]
df_reviews = spark.createDataFrame(reviews_data, ["id", "review", "rating"])
print("Raw reviews:")
df_reviews.show(truncate=False)


def clean_text_pipeline(df, text_col, output_col="text_cleaned"):
    result = (
        df.withColumn("_step1_trim", trim(col(text_col)))
        .withColumn("_step2_lower", lower(col("_step1_trim")))
        .withColumn(
            "_step3_alphanum", regexp_replace(col("_step2_lower"), "[^a-z0-9\\s]", "")
        )
        .withColumn("_step4_spaces", regexp_replace(col("_step3_alphanum"), "\\s+", ""))
        .withColumn(output_col, trim(col("_step4_spaces")))
        .drop("_step1_trim", "_step2_lower", "_step3_alphanum", "_step4_spaces")
    )
    return result


df_reviews_cleaned = clean_text_pipeline(df_reviews, "review", "review_cleaned")
print("Cleaned reviews:")
df_reviews_cleaned.select("review", "review_cleaned").show(truncate=False)

Raw reviews:
+---+------------------------------------+------+
|id |review                              |rating|
+---+------------------------------------+------+
|1  | GREAT product!!! Would buy again.  |5     |
|2  |Terrible... don't waste your money  |1     |
|3  |Good quality, fast shipping!!!      |4     |
|4  | not bad, could be better...        |3     |
|5  |AMAZING!!!!! Best purchase EVER!!!!!|5     |
|6  |meh, it's OK I guess                |3     |
|7  |Product arrived damaged             |1     |
|8  |10/10 would recommend to friends!   |5     |
+---+------------------------------------+------+

Cleaned reviews:
+------------------------------------+---------------------------+
|review                              |review_cleaned             |
+------------------------------------+---------------------------+
| GREAT product!!! Would buy again.  |greatproductwouldbuyagain  |
|Terrible... don't waste your money  |terribledontwasteyourmoney |
|Good quality, fast shipping!!!   

### Practical Exercise: Product Review Cleaning
Tasks:
- Clean the review text (remove special chars, normalize case, trim)
- Extract the word count from each cleaned review
- Identify reviews that mention negative keywords (return, refund, broken)
- Calculate the average rating for reviews with negative keywords vs others

In [13]:
# Extended reviews dataset
extended_reviews = [
    (1, "LOVE IT! Fast shipping, great quality. Will order more!", 5),
    (2, "Product broke after 1 week. Requesting refund immediately.", 1),
    (3, "Decent product for the price. Nothing special but works.", 3),
    (4, "DO NOT BUY! Cheap quality, had to return it!", 1),
    (5, "Excellent! Better than expected. 100% recommend.", 5),
    (6, "Item was damaged during shipping. Broken on arrival.", 1),
    (7, "Good value for money. Satisfied with purchase.", 4),
    (8, "WORST PURCHASE EVER! Want my money back!!!", 1),
    (9, "Nice product, works as described. Happy customer.", 4),
    (10, "Meh, it's okay. Nothing to complain about.", 3),
]

df_extended = spark.createDataFrame(extended_reviews, ["id", "review", "rating"])
df_extended.show(truncate=False)

from pyspark.sql.functions import (
    col,
    lower,
    trim,
    regexp_replace,
    split,
    size,
    when,
    avg,
)

# Clean the extended reviews
df_processed = df_extended.withColumn(
    "clean_review",
    trim(
        regexp_replace(
            regexp_replace(
                lower(col("review")),  
                "[^a-z0-9\\s]",        
                ""
            ),
            "\\s+",                    
            " "
        )
    )
)



# Word Count
df_processed = df_processed.withColumn(
    "word_count", size(split(col("clean_review"), " "))
)

# Negative keywords flag
df_processed = df_processed.withColumn(
    "has_negative_keyword",
    when(col("clean_review").rlike("return|refund|broken"), 1).otherwise(0),
)

# Processed DataFrame
df_processed.show(truncate=False)

# Average Rating Comparision
df_processed.groupBy("has_negative_keyword").agg(
    avg("rating").alias("average_rating")
).show()


+---+----------------------------------------------------------+------+
|id |review                                                    |rating|
+---+----------------------------------------------------------+------+
|1  |LOVE IT! Fast shipping, great quality. Will order more!   |5     |
|2  |Product broke after 1 week. Requesting refund immediately.|1     |
|3  |Decent product for the price. Nothing special but works.  |3     |
|4  |DO NOT BUY! Cheap quality, had to return it!              |1     |
|5  |Excellent! Better than expected. 100% recommend.          |5     |
|6  |Item was damaged during shipping. Broken on arrival.      |1     |
|7  |Good value for money. Satisfied with purchase.            |4     |
|8  |WORST PURCHASE EVER! Want my money back!!!                |1     |
|9  |Nice product, works as described. Happy customer.         |4     |
|10 |Meh, it's okay. Nothing to complain about.                |3     |
+---+----------------------------------------------------------+

## Regular Expressions in Spark

### Regex in PySpark

In [None]:
# Sample Data
from nt import truncate


contact_data = [
    (
        1,
        "John Smith",
        "john.smith@email.com",
        "(555) 123-4567",
        "123 Main St, New York, NY 10001",
    ),
    (
        2,
        "Jane Doe",
        "jane_doe@company.co.uk",
        "555-987-6543",
        "456 Oak Ave, Los Angeles, CA 90001",
    ),
    (
        3,
        "Bob Wilson",
        "bob123@test.org",
        "5551234567",
        "789 Pine Rd, Chicago, IL 60601",
    ),
    (
        4,
        "Alice Brown",
        "alice.brown@domain.net",
        "(555)456-7890",
        "321 Elm Blvd, Houston, TX 77001",
    ),
    (
        5,
        "Charlie Lee",
        "invalid-email",
        "phone: 555-111-2222",
        "PO Box 100, Phoenix, AZ 85001",
    ),
]

df_contacts = spark.createDataFrame(
    contact_data, ["id", "name", "email", "phone", "address"]
)

print("Contact data:")
df_contacts.show(truncate=False)

Contact data:
+---+-----------+----------------------+-------------------+----------------------------------+
|id |name       |email                 |phone              |address                           |
+---+-----------+----------------------+-------------------+----------------------------------+
|1  |John Smith |john.smith@email.com  |(555) 123-4567     |123 Main St, New York, NY 10001   |
|2  |Jane Doe   |jane_doe@company.co.uk|555-987-6543       |456 Oak Ave, Los Angeles, CA 90001|
|3  |Bob Wilson |bob123@test.org       |5551234567         |789 Pine Rd, Chicago, IL 60601    |
|4  |Alice Brown|alice.brown@domain.net|(555)456-7890      |321 Elm Blvd, Houston, TX 77001   |
|5  |Charlie Lee|invalid-email         |phone: 555-111-2222|PO Box 100, Phoenix, AZ 85001     |
+---+-----------+----------------------+-------------------+----------------------------------+



### Pattern Matching with rlike

In [None]:
# Check valid email format
email_pattern = r"^[\w.+-]+@[\w-]+\.[a-zA-Z]{2,}$"

df_email_check = df_contacts.withColumn(
    "is_valid_email", col("email").rlike(email_pattern)
)

print("Email validation:")
df_email_check.select("name", "email", "is_valid_email").show(truncate=False)

# Filter for specific domain emails
df_company_emails = df_contacts.filter(col("email").rlike(r"@company\."))

print("Company domain emails:")
df_company_emails.select("name", "email").show()

Email validation:
+-----------+----------------------+--------------+
|name       |email                 |is_valid_email|
+-----------+----------------------+--------------+
|John Smith |john.smith@email.com  |true          |
|Jane Doe   |jane_doe@company.co.uk|false         |
|Bob Wilson |bob123@test.org       |true          |
|Alice Brown|alice.brown@domain.net|true          |
|Charlie Lee|invalid-email         |false         |
+-----------+----------------------+--------------+

Company domain emails:
+--------+--------------------+
|    name|               email|
+--------+--------------------+
|Jane Doe|jane_doe@company....|
+--------+--------------------+



### Extracting Data with regexp_extract

In [None]:
# Extract email components
df_email_parts = (
    df_contacts.withColumn(
        "email_username", regexp_extract(col("email"), r"^([\w.+-]+)@", 1)
    )
    .withColumn("email_domain", regexp_extract(col("email"), r"@([\w-]+)\.", 1))
    .withColumn("email_tld", regexp_extract(col("email"), r"\.([a-zA-Z.]+)$", 1))
)

print("Extracted email components:")
df_email_parts.select("email", "email_username", "email_domain", "email_tld").show(
    truncate=False
)

# Extract and normalise phone numbers
df_phone_extract = (
    df_contacts.withColumn("phone_digits", regexp_replace(col("phone"), r"[^\d]", ""))
    .withColumn(
        "area_code",
        regexp_extract(regexp_replace(col("phone"), r"[^\d]", ""), r"^(\d{3})", 1),
    )
    .withColumn(
        "phone_normalised",
        concat(
            lit("("),
            substring(regexp_replace(col("phone"), r"[^\d]", ""), 1, 3),
            lit(")"),
            substring(regexp_replace(col("phone"), r"[^\d]", ""), 4, 3),
            lit("-"),
            substring(regexp_replace(col("phone"), r"[^\d]", ""), 7, 4),
        ),
    )
)

print("Phone number extraction and normalisation:")
df_phone_extract.select("phone", "phone_digits", "area_code", "phone_normalised").show(
    truncate=False
)

# Extract address components
df_address_parts = (
    df_contacts.withColumn("street", regexp_extract(col("address"), r"^(.+),", 1))
    .withColumn("city", regexp_extract(col("address"), r", ([^,]+), [A-Z]{2}", 1))
    .withColumn("state", regexp_extract(col("address"), r", ([A-Z]{2}) \d", 1))
    .withColumn("zip_code", regexp_extract(col("address"), r"(\d{5})$", 1))
)

print("Extracted address components:")
df_address_parts.select("address", "street", "city", "state", "zip_code").show(
    truncate=False
)

Extracted email components:
+----------------------+--------------+------------+---------+
|email                 |email_username|email_domain|email_tld|
+----------------------+--------------+------------+---------+
|john.smith@email.com  |john.smith    |email       |com      |
|jane_doe@company.co.uk|jane_doe      |company     |co.uk    |
|bob123@test.org       |bob123        |test        |org      |
|alice.brown@domain.net|alice.brown   |domain      |net      |
|invalid-email         |              |            |         |
+----------------------+--------------+------------+---------+

Phone number extraction adn normalisation:
+-------------------+------------+---------+----------------+
|phone              |phone_digits|area_code|phone_normalised|
+-------------------+------------+---------+----------------+
|(555) 123-4567     |5551234567  |555      |(555)123-4567   |
|555-987-6543       |5559876543  |555      |(555)987-6543   |
|5551234567         |5551234567  |555      |(555)12

### Replacing Patterns with regexp_replace

In [None]:
# Mask sensitive data
df_masked = df_contacts.withColumn(
    "email_masked", regexp_replace(col("email"), r"^(.{2}).*(@.*)$", r"$1***$2")
).withColumn(
    "phone_masked",
    regexp_replace(
        regexp_replace(col("phone"), r"[^\d]", ""),
        r"^(\d{3})(\d{3})(\d{4})$",
        r"(***) ***-$3",
    ),
)

print("Masked sensitive data:")
df_masked.select("name", "email", "email_masked", "phone", "phone_masked").show(
    truncate=False
)

# Clean and standarise data
messy_text_data = [
    (1, "Hello World!!!"),
    (2, "Multiple spaces here"),
    (3, "Lots of!!!!! punctuation???"),
    (4, "   Leading and trailing   "),
    (5, "MixED CaSe TeXt"),
]

df_messy_text = spark.createDataFrame(messy_text_data, ["id", "text"])

df_standardized = df_messy_text.withColumn(
    "text_clean",
    trim(
        regexp_replace(
            regexp_replace(lower(col("text")), rf"([!?.]){(2,)}", r"$1"), r"\s+", " "
        )
    ),
)

print("Text standardization:")
df_standardized.show(truncate=False)

Masked sensitive data:
+-----------+----------------------+-------------------+-------------------+--------------+
|name       |email                 |email_masked       |phone              |phone_masked  |
+-----------+----------------------+-------------------+-------------------+--------------+
|John Smith |john.smith@email.com  |jo***@email.com    |(555) 123-4567     |(***) ***-4567|
|Jane Doe   |jane_doe@company.co.uk|ja***@company.co.uk|555-987-6543       |(***) ***-6543|
|Bob Wilson |bob123@test.org       |bo***@test.org     |5551234567         |(***) ***-4567|
|Alice Brown|alice.brown@domain.net|al***@domain.net   |(555)456-7890      |(***) ***-7890|
|Charlie Lee|invalid-email         |invalid-email      |phone: 555-111-2222|(***) ***-2222|
+-----------+----------------------+-------------------+-------------------+--------------+

Text standardization:
+---+---------------------------+---------------------------+
|id |text                       |text_clean                 |
+-

### Practical Exercise: Log File Parsing

Parse web server access logs using regex to extract structured information.
- Extract IP address, timestamp, HTTP method, URL path, status code, and response size
- Filter for error status codes (4xx and 5xx)
- Count requests by HTTP method
- Identify the most accessed URLs

In [None]:
# Sample Apache access logs
log_data = [
    (
        1,
        '192.168.1.100 - - [15/Jan/2024:10:30:45 +0000] "GET /index.html HTTP/1.1" 200 1234',
    ),
    (
        2,
        '10.0.0.50 - - [15/Jan/2024:10:31:00 +0000] "POST /api/users HTTP/1.1" 201 567',
    ),
    (
        3,
        '192.168.1.101 - - [15/Jan/2024:10:31:15 +0000] "GET /about.html HTTP/1.1" 200 890',
    ),
    (
        4,
        '172.16.0.25 - - [15/Jan/2024:10:31:30 +0000] "GET /missing.html HTTP/1.1" 404 234',
    ),
    (
        5,
        '192.168.1.100 - - [15/Jan/2024:10:31:45 +0000] "GET /api/data HTTP/1.1" 500 100',
    ),
    (
        6,
        '10.0.0.75 - - [15/Jan/2024:10:32:00 +0000] "PUT /api/users/1 HTTP/1.1" 200 456',
    ),
    (
        7,
        '192.168.1.102 - - [15/Jan/2024:10:32:15 +0000] "DELETE /api/users/2 HTTP/1.1" 204 0',
    ),
    (
        8,
        '172.16.0.30 - - [15/Jan/2024:10:32:30 +0000] "GET /index.html HTTP/1.1" 200 1234',
    ),
    (9, '10.0.0.50 - - [15/Jan/2024:10:32:45 +0000] "POST /api/login HTTP/1.1" 401 89'),
    (
        10,
        '192.168.1.100 - - [15/Jan/2024:10:33:00 +0000] "GET /api/products HTTP/1.1" 200 5678',
    ),
]

df_logs = spark.createDataFrame(log_data, ["id", "log_line"])
print("sample access logs:")
df_logs.show(truncate=False)

from pyspark.sql.functions import col, regexp_extract, count

# Extract components from Apache log format
df_parsed = (
    df_logs.withColumn("ip_address", regexp_extract(col("log_line"), r"^([\d\.]+)", 1))
    .withColumn("timestamp", regexp_extract(col("log_line"), r"\[([^\]]+)\]", 1))
    .withColumn(
        "http_method", regexp_extract(col("log_line"), r"\"(GET|POST|PUT|DELETE)", 1)
    )
    .withColumn(
        "url_path",
        regexp_extract(col("log_line"), r"\"(?:GET|POST|PUT|DELETE) ([^ ]+)", 1),
    )
    .withColumn(
        "status_code", regexp_extract(col("log_line"), r"\" (\d{3})", 1).cast("int")
    )
    .withColumn(
        "response_size", regexp_extract(col("log_line"), r" (\d+)$", 1).cast("int")
    )
)

print("Parsed strctured logs:")

df_parsed.select(
    "ip_address", "timestamp", "http_method", "url_path", "status_code", "response_size"
).show(truncate=False)


# Filter Error Codes
df_errors = df_parsed.filter(col("status_code") >= 400)

print("Error requests (4xx and 5xx):")
df_errors.select("ip_address", "url_path", "status_code").show()


# Count Requests by HTTP Method
print("Request count by HTTP method:")
df_parsed.groupBy("http_method").agg(count("*").alias("request_count")).show()


# Most accessed URLs
print("Most accessed URLs:")
df_parsed.groupBy("url_path").agg(count("*").alias("access_count")).orderBy(
    col("access_count").desc()
).show()

sample access logs:
+---+------------------------------------------------------------------------------------+
|id |log_line                                                                            |
+---+------------------------------------------------------------------------------------+
|1  |192.168.1.100 - - [15/Jan/2024:10:30:45 +0000] "GET /index.html HTTP/1.1" 200 1234  |
|2  |10.0.0.50 - - [15/Jan/2024:10:31:00 +0000] "POST /api/users HTTP/1.1" 201 567       |
|3  |192.168.1.101 - - [15/Jan/2024:10:31:15 +0000] "GET /about.html HTTP/1.1" 200 890   |
|4  |172.16.0.25 - - [15/Jan/2024:10:31:30 +0000] "GET /missing.html HTTP/1.1" 404 234   |
|5  |192.168.1.100 - - [15/Jan/2024:10:31:45 +0000] "GET /api/data HTTP/1.1" 500 100     |
|6  |10.0.0.75 - - [15/Jan/2024:10:32:00 +0000] "PUT /api/users/1 HTTP/1.1" 200 456      |
|7  |192.168.1.102 - - [15/Jan/2024:10:32:15 +0000] "DELETE /api/users/2 HTTP/1.1" 204 0 |
|8  |172.16.0.30 - - [15/Jan/2024:10:32:30 +0000] "GET /index.html HTT

## Building Complete Preprocessing Pipelines

### Combining Multiple Preprocessing Steps

In [None]:
# Real world messy dataset
raw_data = [
    (
        "1",
        " JOHN SMITH ",
        "john.smith@EMAIL.com",
        "(555) 123-4567",
        "25",
        "$50,000.00",
        "2024-01-15",
    ),
    (
        "2",
        "jane doe",
        "JANE_DOE@COMPANY.CO.UK",
        "555-987-6543",
        "N/A",
        "60000",
        "15/02/2024",
    ),
    (
        "3",
        "Bob Wilson",
        "invalid-email",
        "5551234567",
        "35",
        "75,000",
        "March 20, 2024",
    ),
    ("4", None, "alice@domain.net", "(555)456-7890", "28", None, "2024-04-10"),
    ("5", "Charlie Lee", "", "phone: 555-111-2222", "forty", "$80,000", None),
]

df_raw = spark.createDataFrame(
    raw_data, ["id", "name", "email", "phone", "age", "salary", "hire_date"]
)

print("Raw messy data:")
df_raw.show(truncate=False)


def preprocess_employee_data(df):
    # Clean and standarise data
    df = df.withColumn(
        "name_clean",
        when(
            col("name").isNull() | (trim(col("name")) == ""),
            concat(lit("Employee_"), col("id")),
        ).otherwise(expr("initcap(trim(name))")),
    )

    # Clean and Validate Email
    email_pattern = r"^[\w.+-]+@[\w-]+\.[a-zA-z]{2,}$"
    df = df.withColumn(
        "email_clean",
        when(
            lower(trim(col("email"))).rlike(email_pattern), lower(trim(col("email")))
        ).otherwise(None),
    )

    # Normalise Phone Number
    df = df.withColumn(
        "phone_digits", regexp_replace(col("phone"), r"[^\d]", "")
    ).withColumn(
        "phone_clean",
        when(
            length(col("phone_digits")) == 10,
            concat(
                lit("("),
                substring(col("phone_digits"), 1, 3),
                lit(")"),
                substring(col("phone_digits"), 4, 3),
                lit("-"),
                substring(col("phone_digits"), 7, 4),
            ),
        ).otherwise(None),
    )

    # Convert Age to Integer
    df = df.withColumn("age_clean", col("age").cast(IntegerType()))

    # Clean and Convert Salary
    df = df.withColumn(
        "salary_clean", regexp_replace(col("salary"), r"[$,]", "").cast(DoubleType())
    )

    # Parse hire date
    df = df.withColumn(
        "hire_date_clean",
        coalesce(
            to_date(col("hire_date"), "yyyy-MM-dd"),
            to_date(col("hire_date"), "dd/MM/yyyy"),
            to_date(col("hire_date"), "MMMM dd, yyyy"),
        ),
    )

    return df.select(
        col("id").cast(IntegerType()).alias("id"),
        col("name_clean").alias("name"),
        col("email_clean").alias("email"),
        col("phone_clean").alias("phone"),
        col("age_clean").alias("age"),
        col("salary_clean").alias("salary"),
        col("hire_date_clean").alias("hire_date"),
    )


df_clean = preprocess_employee_data(df_raw)

print("Cleaned data:")
df_clean.show(truncate=False)
df_clean.printSchema()

Raw messy data:
+---+------------+----------------------+-------------------+-----+----------+--------------+
|id |name        |email                 |phone              |age  |salary    |hire_date     |
+---+------------+----------------------+-------------------+-----+----------+--------------+
|1  | JOHN SMITH |john.smith@EMAIL.com  |(555) 123-4567     |25   |$50,000.00|2024-01-15    |
|2  |jane doe    |JANE_DOE@COMPANY.CO.UK|555-987-6543       |N/A  |60000     |15/02/2024    |
|3  |Bob Wilson  |invalid-email         |5551234567         |35   |75,000    |March 20, 2024|
|4  |NULL        |alice@domain.net      |(555)456-7890      |28   |NULL      |2024-04-10    |
|5  |Charlie Lee |                      |phone: 555-111-2222|forty|$80,000   |NULL          |
+---+------------+----------------------+-------------------+-----+----------+--------------+

Cleaned data:
+---+-----------+--------------------+-------------+----+-------+----------+
|id |name       |email               |phone   

### Data Quality Reporting

In [None]:
def generate_quality_report(df_original, df_cleaned):
    total_rows = df_original.count()
    print("=" * 60)
    print("DATA QUALITY REPORT")
    print("=" * 60)
    print(f"\nTotal rows: {total_rows}")
    print("\n--- Missing Value Summary (Cleaned Data) ---")

    for col_name in df_cleaned.columns:
        null_count = df_cleaned.filter(col(col_name).isNull()).count()
        null_pct = (null_count / total_rows) * 100
        status = "OK" if null_pct < 10 else "WARNING" if null_pct < 30 else "CRITICAL"
        print(f" {col_name}: {null_count} nulls ({null_pct:.1f}%) [{status}]")
        total_cells = total_rows * len(df_cleaned.columns)
        null_cells = sum(
            df_cleaned.filter(col(c).isNull()).count() for c in df_cleaned.columns
        )
        completeness = ((total_cells - null_cells) / total_cells) * 100
        print(f"\n--- Overall Data Quality Score ---")
        print(f" Completeness: {completeness:.1f}%")
        print("=" * 60)


generate_quality_report(df_raw, df_clean)

DATA QUALITY REPORT

Total rows: 5

--- Missing Value Summary (Cleaned Data) ---
 id: 0 nulls (0.0%) [OK]

--- Overall Data Quality Score ---
 Completeness: 80.0%
 name: 0 nulls (0.0%) [OK]

--- Overall Data Quality Score ---
 Completeness: 80.0%
 email: 3 nulls (60.0%) [CRITICAL]

--- Overall Data Quality Score ---
 Completeness: 80.0%
 phone: 0 nulls (0.0%) [OK]

--- Overall Data Quality Score ---
 Completeness: 80.0%
 age: 2 nulls (40.0%) [CRITICAL]

--- Overall Data Quality Score ---
 Completeness: 80.0%

--- Overall Data Quality Score ---
 Completeness: 80.0%

--- Overall Data Quality Score ---
 Completeness: 80.0%


## Final Challenge: End-to-End Data Cleaning
- Identify and report all data quality issues
- Handle missing values appropriately for each column
- Standardize text fields (product names, categories)
- Parse and validate dates
- Clean and validate email addresses
- Extract useful information using regex
- Create a comprehensive data quality report
- Output the cleaned dataset

In [None]:
# E-commerce Dataset
ecommerce_data = [
    (
        "ORD001",
        " LAPTOP Computer ",
        "ELECTRONICS",
        "john@email.com",
        "$999.99",
        "2",
        "2024-01-15 10:30:00",
        "Completed",
    ),
    (
        "ORD002",
        "wireless mouse",
        "electronics",
        "JANE@COMPANY.CO.UK",
        "29.99",
        None,
        "15/01/2024",
        "completed",
    ),
    (
        "ORD003",
        "USB-C Cable (6ft)",
        "Accessories",
        "invalid-email",
        "$12.50",
        "5",
        "January 16, 2024",
        "PENDING",
    ),
    ("ORD004", None, "CLOTHING", "bob@test.org", "N/A", "1", "2024-01-16", "Cancelled"),
    (
        "ORD005",
        "Running Shoes - Size 10",
        " clothing ",
        "",
        "89.00",
        "2",
        None,
        "Shipped",
    ),
    (
        "ORD006",
        "HDMI Cable!!!",
        "accessories",
        "alice@domain.net",
        "$15.00",
        "10",
        "2024-01-17 14:20:00",
        "completed",
    ),
    (
        "ORD007",
        "smartphone case",
        "Electronics",
        "charlie@email.com",
        "$25",
        "3",
        "17-Jan-2024",
        "Processing",
    ),
    (
        "ORD008",
        " Winter Jacket ",
        "Clothing",
        "N/A",
        "$149.99",
        None,
        "2024-01-18",
        "SHIPPED",
    ),
]

df_ecommerce = spark.createDataFrame(
    ecommerce_data,
    [
        "order_id",
        "product",
        "category",
        "email",
        "price",
        "quantity",
        "order_date",
        "status",
    ],
)

print("Raw e-commerce data:")
df_ecommerce.show(truncate=False)

# Imports
from pyspark.sql.functions import (
    col,
    trim,
    lower,
    upper,
    initcap,
    regexp_extract,
    regexp_replace,
    when,
    coalesce,
    to_timestamp,
    count,
    sum,
)
from pyspark.sql.types import IntegerType, DoubleType

# Text Standardisation
df_clean = (
    df_ecommerce.withColumn(
        "product_clean",
        initcap(trim(regexp_replace(col("product"), r"[^a-zA-z0-9\s]", ""))),
    )
    .withColumn("category_clean", initcap(trim(col("category"))))
    .withColumn("status_clean", initcap(trim(col("status"))))
)

# Email Validation
email_pattern = r"^[\w.+-]+@[\w-]+(\.[\w-]+)+$"
df_clean = df_clean.withColumn(
    "email_clean",
    when(
        lower(trim(col("email"))).rlike(email_pattern), lower(trim(col("email")))
    ).otherwise(None),
)

# Price Cleaning
df_clean = df_clean.withColumn(
    "price_clean", regexp_replace(col("price"), r"[$,]", "").cast(DoubleType())
)

# Quantity Cleaning
df_clean = df_clean.withColumn(
    "quantity_clean",
    when(col("quantity").rlike("^\d+$"), col("quantity").cast(IntegerType())).otherwise(
        None
    ),
)

# Date Parsing
df_clean = df_clean.withColumn(
    "order_date_clean",
    coalesce(
        to_timestamp(col("order_date"), "yyyy-MM-dd HH:mm:ss"),
        to_timestamp(col("order_date"), "dd/MM/yyyy"),
        to_timestamp(col("order_date"), "MMMM dd, yyyy"),
        to_timestamp(col("order_date"), "dd-MM-yyyy"),
    ),
)


# Data Quality Report
print("Data Quality Report")

df_quality = df_clean.select(
    count("*").alias("total_records"),
    sum(col("product_clean").isNull().cast("int")).alias("missing_product"),
    sum(col("email_clean").isNull().cast("int")).alias("invalid_email"),
    sum(col("price_clean").isNull().cast("int")).alias("invalid_price"),
    sum(col("quantity_clean").isNull().cast("int")).alias("invalid_quantity"),
    sum(col("order_date_clean").isNull().cast("int")).alias("invalid_date"),
)

df_quality.show()


# Extract Useful Regex Info (Product Size)
df_clean = df_clean.withColumn(
    "size_extracted", regexp_extract(col("product"), r"Size (\d+)", 1)
)


# Clean Output
df_final = df_clean.select(
    col("order_id"),
    col("product_clean").alias("product"),
    col("category_clean").alias("category"),
    col("email_clean").alias("email"),
    col("price_clean").alias("price"),
    col("quantity_clean").alias("quantity"),
    col("order_date_clean").alias("order_date"),
    col("status_clean").alias("status"),
    col("size_extracted"),
)

print("Final Cleaned Dataset:")
df_final.show(truncate=False)
df_final.printSchema()

Raw e-commerce data:


+--------+-----------------------+-----------+------------------+-------+--------+-------------------+----------+
|order_id|product                |category   |email             |price  |quantity|order_date         |status    |
+--------+-----------------------+-----------+------------------+-------+--------+-------------------+----------+
|ORD001  | LAPTOP Computer       |ELECTRONICS|john@email.com    |$999.99|2       |2024-01-15 10:30:00|Completed |
|ORD002  |wireless mouse         |electronics|JANE@COMPANY.CO.UK|29.99  |NULL    |15/01/2024         |completed |
|ORD003  |USB-C Cable (6ft)      |Accessories|invalid-email     |$12.50 |5       |January 16, 2024   |PENDING   |
|ORD004  |NULL                   |CLOTHING   |bob@test.org      |N/A    |1       |2024-01-16         |Cancelled |
|ORD005  |Running Shoes - Size 10| clothing  |                  |89.00  |2       |NULL               |Shipped   |
|ORD006  |HDMI Cable!!!          |accessories|alice@domain.net  |$15.00 |10      |2024-0

In [49]:
# Stop the Spark session
spark.stop()
print("Spark session stopped.")

Spark session stopped.
