![ecommerce_analytics-1224x532](ecommerce_analytics-1224x532.jpg)


As a Data Engineer at an electronics e-commerce company, Voltmart, you have been requested by a peer Machine Learning team to clean the data containing the information about orders made last year. They are planning to further use this cleaned data to build a demand forecasting model. To achieve this, they have shared their requirements regarding the desired output table format.

An analyst shared a parquet file called `"orders_data.parquet"` for you to clean and preprocess. 

You can see the dataset schema below along with the **cleaning requirements**:

## `orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

In [98]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)
from pyspark.sql.functions import col, hour, when, date_format, split, size
spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

In [99]:
# 1. Reading the data

orders_data = spark.read.parquet('orders_data.parquet')

In [100]:
# 2. Dealing with the order date column

orders_data = orders_data.withColumn(
    'time_of_day',
    when((hour('order_date') >= 5) & (hour('order_date') < 12), 'morning')
    .when((hour('order_date') >= 12) & (hour('order_date') < 18), 'afternoon')
    .when((hour('order_date') >= 18) & (hour('order_date') < 24), 'evening')
    .otherwise('night')
)

# Filter out the orders made at night time

orders_data = orders_data.filter(col('time_of_day') != 'night')

# Convert the order date column to date format

orders_data = orders_data.withColumn('order_date', col('order_date').cast(types.DateType()))

orders_data.show()


+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+
|order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|        cost_price|turnover|  margin|time_of_day|
+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+
|2023-01-22|  141234|              iPhone|5.638008983335E12|   Vêtements|944 Walnut St, Bo...|               1|     700.0|             231.0|   700.0|   469.0|    evening|
|2023-01-28|  141235|Lightning Chargin...|5.563319511488E12|Alimentation|185 Maple St, Por...|               1|     14.95|             7.475|   14.95|   7.475|  afternoon|
|2023-01-17|  141236|    Wired Headphones| 2.11397339522E12|   Vêtements|538 Adams St, San...|               2|     11.99|             5.995

In [101]:
# 3. Cleaning the product info columns
# Make the strings lowercase

orders_data = orders_data.withColumn('product', F.lower(col('product')))
orders_data = orders_data.withColumn('category', F.lower(col('category')))

# Remove TV orders
orders_data = orders_data.where(~orders_data['product'].contains('tv'))

In [102]:
# 4. Extracting state from address and getting the number of states

orders_data = orders_data.withColumn(
    "purchase_state",
    split(col("purchase_address"), " ").getItem(size(split(col("purchase_address"), " ")) - 2)
)
orders_data.show()

+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+--------------+
|order_date|order_id|             product|       product_id|    category|    purchase_address|quantity_ordered|price_each|        cost_price|turnover|  margin|time_of_day|purchase_state|
+----------+--------+--------------------+-----------------+------------+--------------------+----------------+----------+------------------+--------+--------+-----------+--------------+
|2023-01-22|  141234|              iphone|5.638008983335E12|   vêtements|944 Walnut St, Bo...|               1|     700.0|             231.0|   700.0|   469.0|    evening|            MA|
|2023-01-28|  141235|lightning chargin...|5.563319511488E12|alimentation|185 Maple St, Por...|               1|     14.95|             7.475|   14.95|   7.475|  afternoon|            OR|
|2023-01-17|  141236|    wired headphones| 2.11397339522E12|   vê

In [103]:
# Count the number of unique states and save it as n_states
n_states = orders_data.select('purchase_state')
n_states = n_states.distinct().count()
print(n_states)

8


In [104]:
# 5. Prepare and export the resulting table

orders_data.write.parquet("orders_data_clean.parquet", mode='overwrite')

                                                                                