![ecommerce_analytics-1224x532](ecommerce_analytics-1224x532.jpg)


As a Data Engineer at an electronics e-commerce company, Voltmart, you have been requested by a peer Machine Learning team to clean the data containing the information about orders made last year. They are planning to further use this cleaned data to build a demand forecasting model. To achieve this, they have shared their requirements regarding the desired output table format.

An analyst shared a parquet file called `"orders_data.parquet"` for you to clean and preprocess. 

You can see the dataset schema below along with the **cleaning requirements**:

## `orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

In [30]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

In [31]:
orders_data = spark.read.parquet('orders_data.parquet')
orders_data.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995


# Modifications for order_date

## Remove orders placed between 12am and 5am (inclusive)

In [32]:
# Start here, using as many cells as you require
from pyspark.sql.functions import hour

# extract hour from the order_date column
df_with_hour = orders_data.withColumn("hour", hour(orders_data["order_date"]))
df_with_hour.toPandas()["hour"].describe()

count    185950.000000
mean         14.413305
std           5.423416
min           0.000000
25%          11.000000
50%          15.000000
75%          19.000000
max          23.000000
Name: hour, dtype: float64

In [33]:
filtered_df = df_with_hour.filter(~(df_with_hour["hour"].between(0,5)))
# filtered_df = filtered_df.drop("hour")
filtered_df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 175441 entries, 0 to 175440
Data columns (total 12 columns):
 #   Column            Non-Null Count   Dtype         
---  ------            --------------   -----         
 0   order_date        175441 non-null  datetime64[ns]
 1   order_id          175441 non-null  int64         
 2   product           175441 non-null  object        
 3   product_id        175441 non-null  float64       
 4   category          175441 non-null  object        
 5   purchase_address  175441 non-null  object        
 6   quantity_ordered  175441 non-null  int64         
 7   price_each        175441 non-null  float64       
 8   cost_price        175441 non-null  float64       
 9   turnover          175441 non-null  float64       
 10  margin            175441 non-null  float64       
 11  hour              175441 non-null  int32         
dtypes: datetime64[ns](1), float64(5), int32(1), int64(2), object(3)
memory usage: 15.4+ MB


In [34]:
orders_data = filtered_df
orders_data.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 175441 entries, 0 to 175440
Data columns (total 12 columns):
 #   Column            Non-Null Count   Dtype         
---  ------            --------------   -----         
 0   order_date        175441 non-null  datetime64[ns]
 1   order_id          175441 non-null  int64         
 2   product           175441 non-null  object        
 3   product_id        175441 non-null  float64       
 4   category          175441 non-null  object        
 5   purchase_address  175441 non-null  object        
 6   quantity_ordered  175441 non-null  int64         
 7   price_each        175441 non-null  float64       
 8   cost_price        175441 non-null  float64       
 9   turnover          175441 non-null  float64       
 10  margin            175441 non-null  float64       
 11  hour              175441 non-null  int32         
dtypes: datetime64[ns](1), float64(5), int32(1), int64(2), object(3)
memory usage: 15.4+ MB


## Convert from timestamp to date

In [35]:
from pyspark.sql.functions import to_date

orders_data = orders_data.withColumn("order_date",to_date(orders_data["order_date"]))

schema = orders_data.schema
schema["order_date"].dataType

DateType()

# Modifications for time_of_day

## New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm

In [36]:
from pyspark.sql.functions import hour, when, col
from pyspark.sql.types import TimestampType

df_with_hour = df_with_hour.withColumn("time_of_day", 
                when((df_with_hour["hour"] >= 5) & (df_with_hour["hour"] < 12), "morning")
                .when((df_with_hour["hour"] >= 12) & (df_with_hour["hour"] < 18), "afternoon")
                .when((df_with_hour["hour"] >= 18) & (df_with_hour["hour"] < 24), "evening")
                .otherwise("night"))

df_with_hour.toPandas()["time_of_day"].value_counts()

afternoon    67158
evening      63431
morning      46173
night         9188
Name: time_of_day, dtype: int64

In [37]:
orders_data = df_with_hour

# Modifications for product

## Remove rows containing "TV" as the company has stopped selling this product

In [38]:
orders_data.toPandas()["product"].value_counts()

USB-C Charging Cable          21903
Lightning Charging Cable      21658
AAA Batteries (4-pack)        20641
AA Batteries (4-pack)         20577
Wired Headphones              18882
Apple Airpods Headphones      15549
Bose SoundSport Headphones    13325
27in FHD Monitor               7507
iPhone                         6842
27in 4K Gaming Monitor         6230
34in Ultrawide Monitor         6181
Google Phone                   5525
Flatscreen TV                  4800
Macbook Pro Laptop             4724
ThinkPad Laptop                4128
20in Monitor                   4101
Vareebadd Phone                2065
LG Washing Machine              666
LG Dryer                        646
Name: product, dtype: int64

In [39]:
mask = orders_data["product"] == "Flatscreen TV"
tv_data = orders_data.filter(mask)
tv_data.toPandas().head(n=20)

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin,hour,time_of_day
0,2023-01-03 21:54:00,141248,Flatscreen TV,4062756000000.0,Électronique,"363 Spruce St, Austin, TX 73301",1,300.0,99.0,300.0,201.0,21,evening
1,2023-01-02 16:16:00,141283,Flatscreen TV,8633300000000.0,Électronique,"68 Hickory St, Seattle, WA 98101",1,300.0,99.0,300.0,201.0,16,afternoon
2,2023-01-09 18:32:00,141331,Flatscreen TV,7572901000000.0,Électronique,"299 Park St, San Francisco, CA 94016",1,300.0,99.0,300.0,201.0,18,evening
3,2023-01-17 22:34:00,141366,Flatscreen TV,9422250000000.0,Sports,"803 Church St, Seattle, WA 98101",1,300.0,99.0,300.0,201.0,22,evening
4,2023-01-24 13:18:00,141421,Flatscreen TV,1216526000000.0,Vêtements,"154 7th St, Dallas, TX 75001",1,300.0,99.0,300.0,201.0,13,afternoon
5,2023-01-09 20:49:00,141439,Flatscreen TV,1920016000000.0,Vêtements,"536 Walnut St, Boston, MA 02215",1,300.0,99.0,300.0,201.0,20,evening
6,2023-01-07 15:21:00,141468,Flatscreen TV,7292713000000.0,Sports,"268 8th St, Austin, TX 73301",1,300.0,99.0,300.0,201.0,15,afternoon
7,2023-01-02 07:44:00,141469,Flatscreen TV,5355009000000.0,Alimentation,"163 8th St, San Francisco, CA 94016",1,300.0,99.0,300.0,201.0,7,morning
8,2023-01-21 15:57:00,141483,Flatscreen TV,8762538000000.0,Électronique,"216 Willow St, Austin, TX 73301",1,300.0,99.0,300.0,201.0,15,afternoon
9,2023-01-15 18:03:00,141531,Flatscreen TV,1146695000000.0,Alimentation,"252 Sunset St, New York City, NY 10001",1,300.0,99.0,300.0,201.0,18,evening


In [40]:
print(orders_data.toPandas().shape)
orders_data = orders_data.filter(~(mask))
print(orders_data.toPandas().shape)

(185950, 13)
(181150, 13)


## Ensure all values are lowercase

In [41]:
from pyspark.sql.functions import lower

orders_data = orders_data.withColumn("product", lower(orders_data["product"]))

In [42]:
orders_data.toPandas().head(n=20)

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin,hour,time_of_day
0,2023-01-22 21:25:00,141234,iphone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0,21,evening
1,2023-01-28 14:15:00,141235,lightning charging cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475,14,afternoon
2,2023-01-17 13:33:00,141236,wired headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99,13,afternoon
3,2023-01-05 20:33:00,141237,27in fhd monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965,20,evening
4,2023-01-25 11:59:00,141238,wired headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995,11,morning
5,2023-01-29 20:22:00,141239,aaa batteries (4-pack),2953869000000.0,Alimentation,"775 Willow St, San Francisco, CA 94016",1,2.99,1.495,2.99,1.495,20,evening
6,2023-01-26 12:16:00,141240,27in 4k gaming monitor,5173671000000.0,Vêtements,"979 Park St, Los Angeles, CA 90001",1,389.99,128.6967,389.99,261.2933,12,afternoon
7,2023-01-05 12:04:00,141241,usb-c charging cable,8051737000000.0,Vêtements,"181 6th St, San Francisco, CA 94016",1,11.95,5.975,11.95,5.975,12,afternoon
8,2023-01-01 10:30:00,141242,bose soundsport headphones,1508418000000.0,Électronique,"867 Willow St, Los Angeles, CA 90001",1,99.99,49.995,99.99,49.995,10,morning
9,2023-01-22 21:20:00,141243,apple airpods headphones,1386344000000.0,Électronique,"657 Johnson St, San Francisco, CA 94016",1,150.0,97.5,150.0,52.5,21,evening


# Modifications for category

## Ensure all values are lowercase

In [43]:
orders_data = orders_data.withColumn("category", lower(orders_data["category"]))

In [44]:
orders_data.toPandas().head(n=20)

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin,hour,time_of_day
0,2023-01-22 21:25:00,141234,iphone,5638009000000.0,vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0,21,evening
1,2023-01-28 14:15:00,141235,lightning charging cable,5563320000000.0,alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475,14,afternoon
2,2023-01-17 13:33:00,141236,wired headphones,2113973000000.0,vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99,13,afternoon
3,2023-01-05 20:33:00,141237,27in fhd monitor,3069157000000.0,sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965,20,evening
4,2023-01-25 11:59:00,141238,wired headphones,9692681000000.0,électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995,11,morning
5,2023-01-29 20:22:00,141239,aaa batteries (4-pack),2953869000000.0,alimentation,"775 Willow St, San Francisco, CA 94016",1,2.99,1.495,2.99,1.495,20,evening
6,2023-01-26 12:16:00,141240,27in 4k gaming monitor,5173671000000.0,vêtements,"979 Park St, Los Angeles, CA 90001",1,389.99,128.6967,389.99,261.2933,12,afternoon
7,2023-01-05 12:04:00,141241,usb-c charging cable,8051737000000.0,vêtements,"181 6th St, San Francisco, CA 94016",1,11.95,5.975,11.95,5.975,12,afternoon
8,2023-01-01 10:30:00,141242,bose soundsport headphones,1508418000000.0,électronique,"867 Willow St, Los Angeles, CA 90001",1,99.99,49.995,99.99,49.995,10,morning
9,2023-01-22 21:20:00,141243,apple airpods headphones,1386344000000.0,électronique,"657 Johnson St, San Francisco, CA 94016",1,150.0,97.5,150.0,52.5,21,evening


# Modifications for purchase_state

## New column containing: the State that the purchase was ordered from

In [50]:
from pyspark.sql.functions import split, trim

split_col = split(orders_data['purchase_address'], ',\s*')

state_col = split_col.getItem(2)

state_abbreviation = split(trim(state_col), '\s+').getItem(0)

orders_data = orders_data.withColumn("purchase_state", state_abbreviation)

orders_data.toPandas().head(n=20)

                                                                                

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin,time_of_day,state,purchase_state
0,2023-01-22 21:25:00,141234,iphone,5638009000000.0,vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0,evening,MA,MA
1,2023-01-28 14:15:00,141235,lightning charging cable,5563320000000.0,alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475,afternoon,OR,OR
2,2023-01-17 13:33:00,141236,wired headphones,2113973000000.0,vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99,afternoon,CA,CA
3,2023-01-05 20:33:00,141237,27in fhd monitor,3069157000000.0,sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965,evening,CA,CA
4,2023-01-25 11:59:00,141238,wired headphones,9692681000000.0,électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995,morning,TX,TX
5,2023-01-29 20:22:00,141239,aaa batteries (4-pack),2953869000000.0,alimentation,"775 Willow St, San Francisco, CA 94016",1,2.99,1.495,2.99,1.495,evening,CA,CA
6,2023-01-26 12:16:00,141240,27in 4k gaming monitor,5173671000000.0,vêtements,"979 Park St, Los Angeles, CA 90001",1,389.99,128.6967,389.99,261.2933,afternoon,CA,CA
7,2023-01-05 12:04:00,141241,usb-c charging cable,8051737000000.0,vêtements,"181 6th St, San Francisco, CA 94016",1,11.95,5.975,11.95,5.975,afternoon,CA,CA
8,2023-01-01 10:30:00,141242,bose soundsport headphones,1508418000000.0,électronique,"867 Willow St, Los Angeles, CA 90001",1,99.99,49.995,99.99,49.995,morning,CA,CA
9,2023-01-22 21:20:00,141243,apple airpods headphones,1386344000000.0,électronique,"657 Johnson St, San Francisco, CA 94016",1,150.0,97.5,150.0,52.5,evening,CA,CA


In [47]:
orders_data = orders_data.drop("hour")

In [51]:
orders_data = orders_data.drop("state")

In [52]:
orders_data.toPandas().head(n=20)

                                                                                

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin,time_of_day,purchase_state
0,2023-01-22 21:25:00,141234,iphone,5638009000000.0,vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0,evening,MA
1,2023-01-28 14:15:00,141235,lightning charging cable,5563320000000.0,alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475,afternoon,OR
2,2023-01-17 13:33:00,141236,wired headphones,2113973000000.0,vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99,afternoon,CA
3,2023-01-05 20:33:00,141237,27in fhd monitor,3069157000000.0,sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965,evening,CA
4,2023-01-25 11:59:00,141238,wired headphones,9692681000000.0,électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995,morning,TX
5,2023-01-29 20:22:00,141239,aaa batteries (4-pack),2953869000000.0,alimentation,"775 Willow St, San Francisco, CA 94016",1,2.99,1.495,2.99,1.495,evening,CA
6,2023-01-26 12:16:00,141240,27in 4k gaming monitor,5173671000000.0,vêtements,"979 Park St, Los Angeles, CA 90001",1,389.99,128.6967,389.99,261.2933,afternoon,CA
7,2023-01-05 12:04:00,141241,usb-c charging cable,8051737000000.0,vêtements,"181 6th St, San Francisco, CA 94016",1,11.95,5.975,11.95,5.975,afternoon,CA
8,2023-01-01 10:30:00,141242,bose soundsport headphones,1508418000000.0,électronique,"867 Willow St, Los Angeles, CA 90001",1,99.99,49.995,99.99,49.995,morning,CA
9,2023-01-22 21:20:00,141243,apple airpods headphones,1386344000000.0,électronique,"657 Johnson St, San Francisco, CA 94016",1,150.0,97.5,150.0,52.5,evening,CA


In [None]:
# Write the DataFrame to a Parquet file, partitioned by "order_date"
orders_data.write.partitionBy("order_date").parquet("orders_data_clean.parquet")

