# Cleaning an Orders Dataset with PySpark

![ecommerce_analytics-1224x532](ecommerce_analytics-1224x532.jpg)


As a Data Engineer at an electronics e-commerce company, Voltmart, you have been requested by a peer Machine Learning team to clean the data containing the information about orders made last year. They are planning to further use this cleaned data to build a demand forecasting model. To achieve this, they have shared their requirements regarding the desired output table format.

An analyst shared a parquet file called `"orders_data.parquet"` for you to clean and preprocess. 

You can see the dataset schema below along with the **cleaning requirements**:

## `orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_id` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

# Initialize Spark Session

In [None]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

# Read data source

In [None]:
orders_data = spark.read.parquet('orders_data.parquet')
orders_data.toPandas().head()

# Remove unneccessary order date

In [None]:
orders_data = orders_data \
    .filter("HOUR(order_date) NOT BETWEEN 1 AND 4") \
    .filter("product NOT LIKE '%TV%'")

# Transform order_date and time_of_day

In [None]:
# Add time_of_day column
orders_data = (
    orders_data
    .withColumn("time_of_day", 
                F.when((F.hour(F.col("order_date"))>=5)&(F.hour(F.col("order_date"))<12), "morning") \
                .when((F.hour(F.col("order_date"))>=12)&(F.hour(F.col("order_date"))<18), "afternoon") \
                .otherwise("evening"))
    .withColumn("order_date", F.col("order_date").cast("date"))
)

orders_data.toPandas().head()

In [None]:
orders_data.select("purchase_address").collect()

# Transform string columns

In [None]:
orders_data = (
    orders_data
    .withColumn("product", F.lower(F.col("product")))
    .withColumn("category", F.lower(F.col("category")))
    .withColumn("purchase_state", F.regexp_extract(F.col("purchase_address"), r"\b[A-Z]{2}\b", 0))
)

# Reorder columns

In [None]:
order_columns = [
    "order_date",
    "time_of_day",
    "order_id",
    "product",
    "product_id",
    "category",
    "purchase_address",
    "purchase_state",
    "quantity_ordered",
    "price_each",
    "cost_price",
    "turnover",
    "margin"
]
orders_data = orders_data.select(order_columns)

# Write cleaned data

In [None]:
orders_data.write.parquet("orders_data_clean", mode="overwrite")

# Stop Spark session

In [None]:
spark.stop()