# Sales Data Processing Pipeline – PySpark Implementation
- company: Metyis
- maintainer: Hamed Mahdavi Far
- email: hamedmahdavifar31@gmail.com
- date: 2025-08-08
---
## Overview
This notebook outlines a PySpark-based data processing pipeline designed to efficiently handle and transform large volumes of monthly retail sales data. The pipeline reads raw CSV files from a source folder, performs data cleansing, deduplication and partitioning, and writes the optimized output to a designated `cleansed` folder.


## Key Capabilities
- **Ingestion** of multiple raw but structured CSV files representing monthly sales data.
- **inspection** of the data to identify issues such as null values and fully duplicated rows etc.
- **null value handling** to ensure data integrity and quality.
- **Deduplication** using PySpark’s Window functions to ensure data quality to support accurate analytics.
- **Data partitioning** strategy to support efficient querying and downstream analytics
  compression
---

## Important Notes
### Environment Setup for Running PySpark on Windows.

Apache Spark relies on certain Hadoop utilities for internal operations On Windows systems, Spark expects access to the native `winutils.exe` binary.

#### What I Did:
- I manually downloaded the required Hadoop `winutils.exe` and accompanying `hadoop.dll` files for Windows from [https://github.com/steveloughran/winutils](https://github.com/steveloughran/winutils).
- I placed the files in the directory: `C:\hadoop\bin`
- I added the next cell of code inside the next cell in order to Tell Spark (and the underlying Hadoop libraries) where to find Hadoop binaries. 

# 1 Libraries

In [100]:
import os
os.environ["HADOOP_HOME"] = "C:\\hadoop"
os.environ["hadoop.home.dir"] = "C:\\hadoop"
os.environ["PATH"] += os.pathsep + "C:\\hadoop\\bin"

In [101]:
from pyspark.shell import spark
from pyspark.sql.functions import current_timestamp
from pyspark.sql.functions import year, month
from pyspark.sql.functions import to_timestamp
from scripts.util import ( get_rows_with_nulls, 
                           rename_columns_to_snake_case, 
                           get_fully_duplicated_rows, 
                           drop_fully_duplicated_rows,
                           read_and_preview_parquet, 
                           count_fully_duplicated_groups,
                           show_min_max_dates_per_partition,
                            find_any_duplication
                           )



In [102]:
from scripts.notebook_logger import get_notebook_logger
logger = get_notebook_logger("notebook_pipeline")
logger.info("Starting the data processing pipeline notebook.")

`2025-08-07 02:28:06,165 — INFO — Starting the data processing pipeline notebook.`

# 2 Inspection

### Ingesting Multiple CSV Files into a Single Spark DataFrame

In the following cell, I use PySpark to ingest **all CSV files** located in the `../data/sales_data/` directory. By using a wildcard pattern (`*.csv`), Spark automatically loads and combines all matching files into a **single distributed DataFrame**.




In [103]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

sales_schema = StructType([
    StructField("Order ID", IntegerType(), True),
    StructField("Product", StringType(), True),
    StructField("Quantity Ordered", IntegerType(), True),
    StructField("Price Each", DoubleType(), True),
    StructField("Order Date", StringType(), True),
    StructField("Purchase Address", StringType(), True),
])

In [104]:
sales_df = (spark.read.option("header", True).schema(sales_schema).csv("../data/sales_data/*.csv"))
logger.info(f"DataFrame loaded with {sales_df.count()} rows and {len(sales_df.columns)} columns.")

`2025-08-07 02:28:06,493 — INFO — DataFrame loaded with 186850 rows and 6 columns.`

### Preview.

In [105]:
sales_df.show(5, truncate=False)

+--------+--------------------+----------------+----------+--------------+--------------------------------------+
|Order ID|Product             |Quantity Ordered|Price Each|Order Date    |Purchase Address                      |
+--------+--------------------+----------------+----------+--------------+--------------------------------------+
|295665  |Macbook Pro Laptop  |1               |1700.0    |12/30/19 00:01|136 Church St, New York City, NY 10001|
|295666  |LG Washing Machine  |1               |600.0     |12/29/19 07:03|562 2nd St, New York City, NY 10001   |
|295667  |USB-C Charging Cable|1               |11.95     |12/12/19 18:21|277 Main St, New York City, NY 10001  |
|295668  |27in FHD Monitor    |1               |149.99    |12/22/19 15:13|410 6th St, San Francisco, CA 94016   |
|295669  |USB-C Charging Cable|1               |11.95     |12/18/19 12:38|43 Hill St, Atlanta, GA 30301         |
+--------+--------------------+----------------+----------+--------------+--------------

In [106]:
sales_df.describe().show()

+-------+-----------------+------------+------------------+-----------------+--------------+--------------------+
|summary|         Order ID|     Product|  Quantity Ordered|       Price Each|    Order Date|    Purchase Address|
+-------+-----------------+------------+------------------+-----------------+--------------+--------------------+
|  count|           185950|      186305|            185950|           185950|        186305|              186305|
|   mean|230417.5693788653|        NULL|1.1243828986286637|184.3997347674861|          NULL|                NULL|
| stddev| 51512.7371099961|        NULL|0.4427926240286694|332.7313298843445|          NULL|                NULL|
|    min|           141234|20in Monitor|                 1|             2.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|           319670|      iPhone|                 9|           1700.0|    Order Date|    Purchase Address|
+-------+-----------------+------------+------------------+-----------------+-----------

# 3 NULL Values 

#### checking the number null values in the dataframe.This function returns all rows with at least one null value in any row.

In [107]:
rows_with_null = get_rows_with_nulls(sales_df)
rows_with_null.show(10, truncate=False)

`2025-08-07 02:28:08,446 — INFO — Checking for rows with null values in any column.`

`2025-08-07 02:28:08,739 — INFO — Found 900 rows with at least one null value.`

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
|NULL    |Product|NULL            |NULL      |Order Date|Purchase Address|
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
|NULL    |Product|NULL            |NULL      |Order Date|Purchase Address|
|NULL    |Product|NULL            |NULL      |Order Date|Purchase Address|
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
|NULL    |NULL   |NULL            |NULL      |NULL      |NULL            |
+--------+-------+-------

### in this scenario we can drop all rows with null values because there is no vaaluable information in those rows.

In [108]:
sales_df = sales_df.na.drop(how='any')
logger.info(f"After dropping null values, DataFrame has {sales_df.count()} rows and {len(sales_df.columns)} columns.")

`2025-08-07 02:28:09,204 — INFO — After dropping null values, DataFrame has 185950 rows and 6 columns.`

### Let's Check again for rows with null values after dropping them.

In [109]:
rows_with_null = get_rows_with_nulls(sales_df)
rows_with_null.show(10, truncate=False)

`2025-08-07 02:28:09,253 — INFO — Checking for rows with null values in any column.`

`2025-08-07 02:28:09,769 — INFO — Found 0 rows with at least one null value.`

+--------+-------+----------------+----------+----------+----------------+
|Order ID|Product|Quantity Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+----------------+----------+----------+----------------+
+--------+-------+----------------+----------+----------+----------------+



# 4 Naming Columns

### Renaming Columns for Consistency and Readability

In [110]:
sales_df = rename_columns_to_snake_case(sales_df)

`2025-08-07 02:28:10,275 — INFO — Renaming columns to snake_case format.`

`2025-08-07 02:28:10,295 — INFO — Successfully renamed all columns to snake_case.`

# 5 Deduplication

## What is defined as a duplicate?

#### To identify duplicates, I grouped the data by `order_id` and filtered for those appearing more than once. The resulting rows represent potential duplicate entries based on the assumption that `order_id` is unique.

In [111]:
find_any_duplication(sales_df, "order_id").show(10, truncate=False)

`2025-08-07 02:28:10,497 — INFO — Finding duplicated values in column: order_id`

`2025-08-07 02:28:12,270 — INFO — Found 14649 duplicated rows.`

+--------+--------------------------+----------------+----------+--------------+-----------------------------------+
|order_id|product                   |quantity_ordered|price_each|order_date    |purchase_address                   |
+--------+--------------------------+----------------+----------+--------------+-----------------------------------+
|295681  |Google Phone              |1               |600.0     |12/25/19 12:37|79 Elm St, Boston, MA 02215        |
|295681  |USB-C Charging Cable      |1               |11.95     |12/25/19 12:37|79 Elm St, Boston, MA 02215        |
|295681  |Bose SoundSport Headphones|1               |99.99     |12/25/19 12:37|79 Elm St, Boston, MA 02215        |
|295681  |Wired Headphones          |1               |11.99     |12/25/19 12:37|79 Elm St, Boston, MA 02215        |
|295698  |Vareebadd Phone           |1               |400.0     |12/13/19 14:32|175 1st St, New York City, NY 10001|
|295698  |USB-C Charging Cable      |2               |11.95     

## Deduplication Strategy

### Insight

While `order_id` was initially assumed to be a unique identifier, further inspection showed that multiple entries with the same `order_id` can differ by product but share the same timestamp and address — indicating legitimate multi-item purchases. (same shopping cart)

### Final Approach

To preserve data integrity and not losing valuable information, deduplication is performed only on **fully identical rows** where all fields match exactly. This ensures that:

- Multi-item orders are retained
- Only true duplicates (identical records) are removed


### checking the number of rows that are completely identical (fully duplicated) and inspecting the data.

In [112]:
fully_duplicated_rows = get_fully_duplicated_rows(sales_df)
fully_duplicated_rows.show(6, truncate=False)

`2025-08-07 02:28:13,324 — INFO — Identifying fully duplicated rows (all fields match).`

`2025-08-07 02:28:15,689 — INFO — Found 528 fully duplicated rows.`

+--------+------------------------+----------------+----------+--------------+-------------------------------------+
|order_id|product                 |quantity_ordered|price_each|order_date    |purchase_address                     |
+--------+------------------------+----------------+----------+--------------+-------------------------------------+
|304531  |Wired Headphones        |1               |11.99     |12/27/19 00:44|306 Jackson St, Los Angeles, CA 90001|
|304531  |Wired Headphones        |1               |11.99     |12/27/19 00:44|306 Jackson St, Los Angeles, CA 90001|
|266280  |USB-C Charging Cable    |1               |11.95     |10/02/19 11:11|53 Dogwood St, Portland, OR 97035    |
|266280  |USB-C Charging Cable    |1               |11.95     |10/02/19 11:11|53 Dogwood St, Portland, OR 97035    |
|274175  |Lightning Charging Cable|1               |14.95     |10/28/19 07:26|32 River St, Boston, MA 02215        |
|274175  |Lightning Charging Cable|1               |14.95     |1

In [113]:
duplication_stats = count_fully_duplicated_groups(sales_df)
duplication_stats.show()

`2025-08-07 02:28:17,349 — INFO — Counting fully duplicated record groups.`

`2025-08-07 02:28:17,391 — INFO — Counted fully duplicated groups successfully.`

+-----+------+
|count|groups|
+-----+------+
|    2|   264|
+-----+------+



### Remove fully duplicated rows from the DataFrame except one instance of each duplicate.

In [114]:
sales_df = drop_fully_duplicated_rows(sales_df)

`2025-08-07 02:28:19,184 — INFO — Dropping fully duplicated rows.`

`2025-08-07 02:28:19,208 — INFO — Fully duplicated rows removed successfully.`

In [115]:
fully_duplicated_rows = get_fully_duplicated_rows(sales_df)
print(f"Number of fully duplicated rows: {fully_duplicated_rows.count()}")

`2025-08-07 02:28:19,558 — INFO — Identifying fully duplicated rows (all fields match).`

`2025-08-07 02:28:21,856 — INFO — Found 0 fully duplicated rows.`

Number of fully duplicated rows: 0


# 6 Type Conversion

### Converting `order_date` to a Timestamp

The `order_date` column is originally read as a string. We convert it into a proper `TimestampType` using `to_timestamp()` so that Spark can recognize and work with it as a datetime object. This enables accurate filtering, sorting, and usage of time-based functions.


In [116]:
# Convert 'order_date' to timestamp and overwrite it
sales_df = sales_df.withColumn("order_date", to_timestamp("order_date", "MM/dd/yy HH:mm"))


In [117]:
sales_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- product: string (nullable = true)
 |-- quantity_ordered: integer (nullable = true)
 |-- price_each: double (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- purchase_address: string (nullable = true)



### Previewing the DataFrame after converting `order_date` to a timestamp.

In [118]:
sales_df.show(5, truncate=False)

+--------+------------------------+----------------+----------+-------------------+----------------------------------------+
|order_id|product                 |quantity_ordered|price_each|order_date         |purchase_address                        |
+--------+------------------------+----------------+----------+-------------------+----------------------------------------+
|141238  |Wired Headphones        |1               |11.99     |2019-01-25 11:59:00|387 10th St, Austin, TX 73301           |
|141240  |27in 4K Gaming Monitor  |1               |389.99    |2019-01-26 12:16:00|979 Park St, Los Angeles, CA 90001      |
|141243  |Apple Airpods Headphones|1               |150.0     |2019-01-22 21:20:00|657 Johnson St, San Francisco, CA 94016 |
|141264  |Apple Airpods Headphones|1               |150.0     |2019-01-03 09:46:00|937 Highland St, New York City, NY 10001|
|141274  |USB-C Charging Cable    |1               |11.95     |2019-01-17 11:30:00|8 Jackson St, Los Angeles, CA 90001     |


# 7 Feature Engineering

### Adding Year and Month Columns for Partitioning.

In [119]:
sales_df = sales_df.withColumn("order_year", year("order_date")).withColumn("order_month", month("order_date"))

In [120]:
sales_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- product: string (nullable = true)
 |-- quantity_ordered: integer (nullable = true)
 |-- price_each: double (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- purchase_address: string (nullable = true)
 |-- order_year: integer (nullable = true)
 |-- order_month: integer (nullable = true)



In [121]:
sales_df.show(5, truncate=False)

+--------+------------------------+----------------+----------+-------------------+----------------------------------------+----------+-----------+
|order_id|product                 |quantity_ordered|price_each|order_date         |purchase_address                        |order_year|order_month|
+--------+------------------------+----------------+----------+-------------------+----------------------------------------+----------+-----------+
|141238  |Wired Headphones        |1               |11.99     |2019-01-25 11:59:00|387 10th St, Austin, TX 73301           |2019      |1          |
|141240  |27in 4K Gaming Monitor  |1               |389.99    |2019-01-26 12:16:00|979 Park St, Los Angeles, CA 90001      |2019      |1          |
|141243  |Apple Airpods Headphones|1               |150.0     |2019-01-22 21:20:00|657 Johnson St, San Francisco, CA 94016 |2019      |1          |
|141264  |Apple Airpods Headphones|1               |150.0     |2019-01-03 09:46:00|937 Highland St, New York Cit

# 8 Ingestion Time Tracking

### Why Adding `ingestion_time` Column is neccessary

- Tracks when each record was loaded, useful for data lineage and monitoring freshness.
- Supports rollback or reprocessing by identifying data from specific ingestion windows.


In [122]:
sales_df = sales_df.withColumn("ingestion_date", current_timestamp())

### Previewing the Dataframe after adding the `ingestion_ts` column.

In [123]:
sales_df.show(5, truncate=False)

+--------+------------------------+----------------+----------+-------------------+----------------------------------------+----------+-----------+--------------------------+
|order_id|product                 |quantity_ordered|price_each|order_date         |purchase_address                        |order_year|order_month|ingestion_date            |
+--------+------------------------+----------------+----------+-------------------+----------------------------------------+----------+-----------+--------------------------+
|141238  |Wired Headphones        |1               |11.99     |2019-01-25 11:59:00|387 10th St, Austin, TX 73301           |2019      |1          |2025-08-07 02:28:26.085683|
|141240  |27in 4K Gaming Monitor  |1               |389.99    |2019-01-26 12:16:00|979 Park St, Los Angeles, CA 90001      |2019      |1          |2025-08-07 02:28:26.085683|
|141243  |Apple Airpods Headphones|1               |150.0     |2019-01-22 21:20:00|657 Johnson St, San Francisco, CA 94016 |2

 # 9 Output

### Saving Data in Parquet Format with Partitioning

I save the cleansed dataset in **Parquet format** because it offers efficient, columnar storage with built-in compression and schema support. This format is ideal for analytics and large-scale processing.

Additionally, I partition the data by `year` and `month` to enable faster querying and better organization in the data lake. Partitioning improves performance by allowing Spark to read only relevant subsets of the data.


In [124]:
output_path = "../data/cleansed"
sales_df.write.partitionBy("order_year", "order_month").mode("overwrite").parquet(output_path)

# 10 Validation

### Validating the Output

In [125]:
output_path = "../data/cleansed"
verified_df = read_and_preview_parquet(output_path, spark)

`2025-08-07 02:28:29,271 — INFO — Reading parquet files from: ../data/cleansed`

+--------+--------------------+----------------+----------+-------------------+--------------------+--------------------+----------+-----------+
|order_id|             product|quantity_ordered|price_each|         order_date|    purchase_address|      ingestion_date|order_year|order_month|
+--------+--------------------+----------------+----------+-------------------+--------------------+--------------------+----------+-----------+
|  284978|    Wired Headphones|               1|     11.99|2019-12-01 00:09:00|361 Walnut St, Da...|2025-08-07 02:28:...|      2019|         12|
|  288824|27in 4K Gaming Mo...|               1|    389.99|2019-12-01 00:16:00|865 Jackson St, A...|2025-08-07 02:28:...|      2019|         12|
|  292269|    Wired Headphones|               1|     11.99|2019-12-01 01:30:00|629 13th St, New ...|2025-08-07 02:28:...|      2019|         12|
|  294629|    Wired Headphones|               1|     11.99|2019-12-01 01:54:00|668 Ridge St, San...|2025-08-07 02:28:...|      201

`2025-08-07 02:28:29,587 — INFO — Preview of parquet data displayed successfully.`

In [126]:
verified_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- product: string (nullable = true)
 |-- quantity_ordered: integer (nullable = true)
 |-- price_each: double (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- purchase_address: string (nullable = true)
 |-- ingestion_date: timestamp (nullable = true)
 |-- order_year: integer (nullable = true)
 |-- order_month: integer (nullable = true)



In [127]:
verified_df.describe().show()
# nulls are due to the fact that those columns are string. they don't exist in the original data.
# there is no date because sprak cant calculate dates

+-------+------------------+------------+------------------+------------------+--------------------+--------------------+-----------------+
|summary|          order_id|     product|  quantity_ordered|        price_each|    purchase_address|          order_year|      order_month|
+-------+------------------+------------+------------------+------------------+--------------------+--------------------+-----------------+
|  count|            185686|      185686|            185686|            185686|              185686|              185686|           185686|
|   mean|230411.37622653297|        NULL|1.1245435843305365|184.51925546358981|                NULL|  2019.0001831048114|7.058652779423327|
| stddev| 51511.71718332067|        NULL|0.4430687383832869| 332.8438383900524|                NULL|0.013530420167169325|3.502932282404282|
|    min|            141234|20in Monitor|                 1|              2.99|1 11th St, Atlant...|                2019|                1|
|    max|           

In [128]:
show_min_max_dates_per_partition(verified_df)

`2025-08-07 02:28:31,329 — INFO — Displaying min/max dates for each (order_year, order_month) partition.`

+----------+-----------+-------------------+-------------------+
|order_year|order_month|min_date           |max_date           |
+----------+-----------+-------------------+-------------------+
|2019      |1          |2019-01-01 03:07:00|2019-01-31 23:35:00|
|2019      |2          |2019-02-01 00:33:00|2019-02-28 23:59:00|
|2019      |3          |2019-03-01 00:29:00|2019-03-31 23:46:00|
|2019      |4          |2019-04-01 00:00:00|2019-04-30 23:49:00|
|2019      |5          |2019-05-01 00:03:00|2019-05-31 23:49:00|
|2019      |6          |2019-06-01 00:18:00|2019-06-30 23:56:00|
|2019      |7          |2019-07-01 00:31:00|2019-07-31 23:43:00|
|2019      |8          |2019-08-01 00:11:00|2019-08-31 23:57:00|
|2019      |9          |2019-09-01 00:25:00|2019-09-30 23:59:00|
|2019      |10         |2019-10-01 00:09:00|2019-10-31 23:51:00|
|2019      |11         |2019-11-01 00:00:00|2019-11-30 23:56:00|
|2019      |12         |2019-12-01 00:02:00|2019-12-31 23:53:00|
|2020      |1          |2

`2025-08-07 02:28:31,810 — INFO — Min and max dates per partition displayed successfully.`