![ecommerce_analytics-1224x532](ecommerce_analytics-1224x532.jpg)


As a Data Engineer at an electronics e-commerce company, Voltmart, you have been requested by a peer Machine Learning team to clean the data containing the information about orders made last year. They are planning to further use this cleaned data to build a demand forecasting model. To achieve this, they have shared their requirements regarding the desired output table format.

An analyst shared a parquet file called `"orders_data.parquet"` for you to clean and preprocess. 

You can see the dataset schema below along with the **cleaning requirements**:

## `orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

In [283]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

In [284]:
orders_data = spark.read.parquet('orders_data.parquet')
orders_data.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995


In [285]:
# Filtering out orders placed between 12AM and 5AM (Inclusive)
orders = orders_data.filter(
    F.hour("order_date") > 5 
)
# Creating a conditional column (string) time_of_day depending on the order_date (timestamp) column
orders = orders.withColumn(
    "time_of_day",
    F.when((F.hour("order_date") >= 5) & (F.hour("order_date") < 12), "morning")
     .when((F.hour("order_date") >= 12) & (F.hour("order_date") < 18), "afternoon")
     .otherwise("evening")
)
orders.select(F.hour("order_date"), "time_of_day").show(5)


+----------------+-----------+
|hour(order_date)|time_of_day|
+----------------+-----------+
|              21|    evening|
|              14|  afternoon|
|              13|  afternoon|
|              20|    evening|
|              11|    morning|
+----------------+-----------+
only showing top 5 rows



In [286]:
# Changing the datatype of orders_date column from timestamp to date 
orders = orders.withColumn("order_date", F.to_date("order_date"))
orders.select("order_date").show(5)
print(orders.dtypes)

+----------+
|order_date|
+----------+
|2023-01-22|
|2023-01-28|
|2023-01-17|
|2023-01-05|
|2023-01-25|
+----------+
only showing top 5 rows

[('order_date', 'date'), ('order_id', 'bigint'), ('product', 'string'), ('product_id', 'double'), ('category', 'string'), ('purchase_address', 'string'), ('quantity_ordered', 'bigint'), ('price_each', 'double'), ('cost_price', 'double'), ('turnover', 'double'), ('margin', 'double'), ('time_of_day', 'string')]


In [287]:
# Ensure all values are lowercase in product column
# Removing rows that contain "TV" in the orders column 
orders = orders.withColumn("product", F.lower(F.col("product")))
orders = orders.filter(~F.col("product").contains("tv")) # As now all values are lowercase
orders.select("product").show(5)

+--------------------+
|             product|
+--------------------+
|              iphone|
|lightning chargin...|
|    wired headphones|
|    27in fhd monitor|
|    wired headphones|
+--------------------+
only showing top 5 rows



In [288]:
# Ensure all values are lowercase in category column
orders = orders.withColumn("category", F.lower(F.col("category")))
orders.select("category").show(5)

+------------+
|    category|
+------------+
|   vêtements|
|alimentation|
|   vêtements|
|      sports|
|électronique|
+------------+
only showing top 5 rows



In [293]:
# New column containing the State that the purchase was ordered from depeding on purchase_address
orders = orders.withColumn("purchase_state", F.split(F.split(F.col("purchase_address"), ",")[2], " ")[1])
orders.select("purchase_address", "purchase_state").show(5)

+--------------------+--------------+
|    purchase_address|purchase_state|
+--------------------+--------------+
|944 Walnut St, Bo...|            MA|
|185 Maple St, Por...|            OR|
|538 Adams St, San...|            CA|
|738 10th St, Los ...|            CA|
|387 10th St, Aust...|            TX|
+--------------------+--------------+
only showing top 5 rows



In [290]:
# A final look at the DataFrame
orders.show(5)

+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+-------+-----------+--------------+
|order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|cost_price|turnover| margin|time_of_day|purchase_state|
+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+----------+--------+-------+-----------+--------------+
|2023-01-22|  141234|              iphone|5.638008983335E12|   vêtements|944 Walnut St, Bo...|               1|     700.0|     231.0|   700.0|  469.0|    evening|            MA|
|2023-01-28|  141235|lightning chargin...|5.563319511488E12|alimentation|185 Maple St, Por...|               1|     14.95|     7.475|   14.95|  7.475|  afternoon|            OR|
|2023-01-17|  141236|    wired headphones| 2.11397339522E12|   vêtements|538 Adams St, San...|               2

In [291]:
#Saving the Cleaded Spark DataFrame into a parquet file
orders.write.mode("overwrite").parquet("./orders_data_clean.parquet")

                                                                                