In [12]:
# Initialize Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnull, count, regexp_extract, split, coalesce, lit, to_date
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, ArrayType
import pyspark.sql.functions as F

# Create Spark Session
spark = SparkSession.builder \
    .appName("HighPerformancePySpark") \
    .getOrCreate()

# Set log level to WARN to reduce verbosity
spark.sparkContext.setLogLevel("WARN")

In [13]:
# Define schema for the dataset
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_details", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("product_category", StringType(), True),
    StructField("quantity", StringType(), True),
    StructField("price_per_unit", StringType(), True),
    StructField("tags", StringType(), True),
    StructField("items", StringType(), True)
])

# Load the dataset
df = spark.read.csv("/workspaces/high-performance-pyspark-advanced-strategies-for-optimal-data-processing-3919191/data/online_sales_data.csv", schema=schema, header=True)

# Display the dataset
print("Raw Dataset:")
df.show(10, truncate=False)

Raw Dataset:
+--------+-----------------------------------------+----------+----------------+--------+------------------+------------------+----------------------------+
|order_id|customer_details                         |order_date|product_category|quantity|price_per_unit    |tags              |items                       |
+--------+-----------------------------------------+----------+----------------+--------+------------------+------------------+----------------------------+
|ORD001  |Alice Johnson                            |NULL      |Electronics     |4       |15.769160684603047|['urgent', 'gift']|['Phone', 'Charger', 'Case']|
|ORD002  |Bob Smith | 584 Street Name, City 16     |2022-12-30|NULL            |-3      |fifty             |['bulk_order']    |['Book1', 'Book2']          |
|ORD003  |Charlie Brown | 598 Street Name, City 17 |2023-05-22|Books           |ten     |79.63563178465238 |NULL              |['Book1', 'Book2']          |
|ORD004  |David Wilson | 290 Street Name, Cit

In [14]:
df.describe().show()

+-------+--------+--------------------+----------+----------------+-----------------+------------------+--------------+--------------------+
|summary|order_id|    customer_details|order_date|product_category|         quantity|    price_per_unit|          tags|               items|
+-------+--------+--------------------+----------+----------------+-----------------+------------------+--------------+--------------------+
|  count|     100|                 100|        63|              70|               78|                62|            82|                  80|
|   mean|    NULL|                NULL|      NULL|            NULL|             1.68| 55.49113068483046|          NULL|                NULL|
| stddev|    NULL|                NULL|      NULL|            NULL|4.639845176868095|27.456621254927434|          NULL|                NULL|
|    min|  ORD001|       Alice Johnson|2022-12-30|           Books|               -1|15.769160684603047|['bulk_order']|  ['Book1', 'Book2']|
|    max|  OR

In [15]:
# Find all the NULLs in the dataframe

null_values_count = df.select([count(when(isnull(c) , c)).alias(c) for c in df.columns])

null_values_count.show()

+--------+----------------+----------+----------------+--------+--------------+----+-----+
|order_id|customer_details|order_date|product_category|quantity|price_per_unit|tags|items|
+--------+----------------+----------+----------------+--------+--------------+----+-----+
|       0|               0|        37|              30|      22|            38|  18|   20|
+--------+----------------+----------+----------------+--------+--------------+----+-----+



In [5]:
df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_details: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- price_per_unit: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- items: string (nullable = true)



In [16]:
# Detect rows with negative quantity or invalid price
df_invalid = df.filter((col("quantity") < 0) | (col("quantity").rlike("^[^0-9]")) | (col("price_per_unit").rlike("^[^0-9]")))
df_invalid.show()

+--------+--------------------+----------+----------------+--------+------------------+------------------+--------------------+
|order_id|    customer_details|order_date|product_category|quantity|    price_per_unit|              tags|               items|
+--------+--------------------+----------+----------------+--------+------------------+------------------+--------------------+
|  ORD002|Bob Smith | 584 S...|2022-12-30|            NULL|      -3|             fifty|    ['bulk_order']|  ['Book1', 'Book2']|
|  ORD003|Charlie Brown | 5...|2023-05-22|           Books|     ten| 79.63563178465238|              NULL|  ['Book1', 'Book2']|
|  ORD005|Eva Davis | 387 S...|      NULL|            NULL|      -4|             fifty|    ['bulk_order']|['Phone', 'Charge...|
|  ORD006|        Frank Miller|2023-05-22|            NULL|      10|             fifty|       urgent,gift|['Table', 'Chair'...|
|  ORD007|Grace Lee | 869 S...|2023-05-22|           Books|    NULL|             fifty|    ['bulk_order'

In [19]:
df = df.withColumn("Customer Name", split(col("customer_details") , "\\|")[0]).\
         withColumn("Customer Address" , split(col("customer_details"), "\\|")[1])

df = df.withColumn("Customer Address" , when(col("Customer Address").isNull() , lit("Unknown")).otherwise(col("Customer Address")))
df.show()

+--------+--------------------+----------+----------------+--------+------------------+------------------+--------------------+---------------+--------------------+
|order_id|    customer_details|order_date|product_category|quantity|    price_per_unit|              tags|               items|  Customer Name|    Customer Address|
+--------+--------------------+----------+----------------+--------+------------------+------------------+--------------------+---------------+--------------------+
|  ORD001|       Alice Johnson|      NULL|     Electronics|     4.0|15.769160684603047|['urgent', 'gift']|['Phone', 'Charge...|  Alice Johnson|             Unknown|
|  ORD002|Bob Smith | 584 S...|2022-12-30|            NULL|    -3.0|             fifty|    ['bulk_order']|  ['Book1', 'Book2']|     Bob Smith | 584 Street Name,...|
|  ORD003|Charlie Brown | 5...|2023-05-22|           Books|     1.0| 79.63563178465238|              NULL|  ['Book1', 'Book2']| Charlie Brown | 598 Street Name,...|
|  ORD004|

In [20]:
from pyspark.sql.functions import regexp_extract

# Example of extracting components from customer_details (e.g., street name, city)
df_address_split = df.withColumn('street', regexp_extract(col('customer_details'), r'(\d+ Street Name)', 1)) \
                     .withColumn('city', regexp_extract(col('customer_details'), r'City (\d+)', 1))
df_address_split.show()


+--------+--------------------+----------+----------------+--------+------------------+------------------+--------------------+---------------+--------------------+---------------+----+
|order_id|    customer_details|order_date|product_category|quantity|    price_per_unit|              tags|               items|  Customer Name|    Customer Address|         street|city|
+--------+--------------------+----------+----------------+--------+------------------+------------------+--------------------+---------------+--------------------+---------------+----+
|  ORD001|       Alice Johnson|      NULL|     Electronics|     4.0|15.769160684603047|['urgent', 'gift']|['Phone', 'Charge...|  Alice Johnson|             Unknown|               |    |
|  ORD002|Bob Smith | 584 S...|2022-12-30|            NULL|    -3.0|             fifty|    ['bulk_order']|  ['Book1', 'Book2']|     Bob Smith | 584 Street Name,...|584 Street Name|  16|
|  ORD003|Charlie Brown | 5...|2023-05-22|           Books|     1.0| 7

In [18]:
# Data is skewed in quantity column. 
# So we will impute median for NULLs in this column
# "quantity" is String type, first we will cast it to Numeric 
# Then impute the Median
df = df.withColumn("quantity" , col("quantity").cast("double"))


# Step 1: Calculate the median of the 'quantity' column

median_quantity = df.approxQuantile("quantity", [0.5], 0.0)[0]  # 0.5 for median (50th percentile)
df = df.withColumn("quantity" , when(col("quantity").isNull() , median_quantity).otherwise(col("quantity")))
df.show(5)

+--------+--------------------+----------+----------------+--------+------------------+------------------+--------------------+
|order_id|    customer_details|order_date|product_category|quantity|    price_per_unit|              tags|               items|
+--------+--------------------+----------+----------------+--------+------------------+------------------+--------------------+
|  ORD001|       Alice Johnson|      NULL|     Electronics|     4.0|15.769160684603047|['urgent', 'gift']|['Phone', 'Charge...|
|  ORD002|Bob Smith | 584 S...|2022-12-30|            NULL|    -3.0|             fifty|    ['bulk_order']|  ['Book1', 'Book2']|
|  ORD003|Charlie Brown | 5...|2023-05-22|           Books|     1.0| 79.63563178465238|              NULL|  ['Book1', 'Book2']|
|  ORD004|David Wilson | 29...|2022-12-30|           Books|     1.0|27.556430196566655|              NULL|['Laptop', 'Mouse...|
|  ORD005|Eva Davis | 387 S...|      NULL|            NULL|    -4.0|             fifty|    ['bulk_order'

In [7]:
def clean_and_transform(df):
    # Fix order_date
    df = df.withColumn("order_date", 
                       when(to_date(col("order_date"), "yyyy-MM-dd").isNotNull(), 
                            to_date(col("order_date"), "yyyy-MM-dd"))
                       .otherwise(None))
    
    # Fix quantity: Replace invalid quantities with 0
    df = df.withColumn("quantity", 
                       when((col("quantity").isNotNull()) & (col("quantity") >= 0), col("quantity").cast(IntegerType()))
                       .otherwise(0))
    
    # Fix price_per_unit
    df = df.withColumn("price_per_unit", 
                       when(col("price_per_unit").rlike("^-?\\d+(\\.\\d+)?$"), col("price_per_unit").cast(DoubleType()))
                       .otherwise(None))
    
    # Fix product_category
    df = df.withColumn("product_category", 
                       coalesce(col("product_category"), lit("Unknown")))
    
    # Fix tags
    df = df.withColumn("tags", 
                       coalesce(col("tags"), lit("['no tags']")))
    
    # Fix items
    df = df.withColumn("items", 
                       coalesce(col("items"), lit("['no items']")))
    
    # Extract customer name and address
    df = df.withColumn("customer_name", 
                       regexp_extract(col("customer_details"), r"^([^|]+)", 1)) \
           .withColumn("customer_address", 
                       regexp_extract(col("customer_details"), r"\|\s*(.+)$", 1))
    
    # Drop the original customer_details column
    df = df.drop("customer_details")
    
    return df

In [8]:
# Data Validation and Unit Testing
def test_data_quality(df):
    # Test 1: Check for NULL order_ids
    null_order_ids = df.filter(col("order_id").isNull()).count()
    assert null_order_ids == 0, f"Found {null_order_ids} NULL order_ids"
    
    # Test 2: Check for invalid quantities (negative or non-integer)
    invalid_quantities = df.filter((col("quantity").isNull()) | (col("quantity") < 0)).count()
    assert invalid_quantities == 0, f"Found {invalid_quantities} invalid quantities"
    
    # Test 3: Check for invalid prices (negative or non-numeric)
    invalid_prices = df.filter((col("price_per_unit").isNull()) | (col("price_per_unit") < 0)).count()
    assert invalid_prices == 0, f"Found {invalid_prices} invalid prices"
    
    # Test 4: Check for NULL product categories
    null_categories = df.filter(col("product_category").isNull()).count()
    assert null_categories == 0, f"Found {null_categories} NULL product categories"
    
    # Test 5: Check for NULL tags
    null_tags = df.filter(col("tags").isNull()).count()
    assert null_tags == 0, f"Found {null_tags} NULL tags"
    
    # Test 6: Check for NULL items
    null_items = df.filter(col("items").isNull()).count()
    assert null_items == 0, f"Found {null_items} NULL items"
    
    print("All tests passed!")

# Run unit tests
test_data_quality(cleaned_df)

AssertionError: Found 72 invalid quantities

In [9]:
# Aggregation Example: Total revenue by product category
revenue_by_category = cleaned_df.groupBy("product_category") \
    .agg(F.sum(col("quantity") * col("price_per_unit")).alias("total_revenue")) \
    .orderBy("total_revenue", ascending=False)

print("Revenue by Product Category:")
revenue_by_category.show(truncate=False)

Revenue by Product Category:
+----------------+------------------+
|product_category|total_revenue     |
+----------------+------------------+
|Unknown         |1355.2008182536647|
|Books           |412.28163560835685|
|Electronics     |122.00022348307672|
|Home & Kitchen  |-596.1881632731537|
+----------------+------------------+



In [10]:
# Stop Spark Session
spark.stop()