![ecommerce_analytics-1224x532](ecommerce_analytics-1224x532.jpg)


# Data Cleaning and Preprocessing for Demand Forecasting

Welcome to this Jupyter Notebook! As Data Engineers at Voltmart, an electronics e-commerce company, we have been tasked with cleaning and preprocessing the data for orders made last year. This task is crucial as it will enable our Machine Learning team to build an accurate demand forecasting model.

In this notebook, we will walk you through the steps we took to clean and preprocess the data, ensuring it meets the requirements shared by our analysts. The dataset, provided in a parquet file named `"orders_data.parquet"`, contains various attributes related to customer orders. Below, you will find the dataset schema along with the specific cleaning requirements that need to be addressed.

Let's dive in and get started with the data cleaning process!

## `orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

## Task Explanation and Required Steps

### Task
We need to clean and transform the `orders_data.parquet` file based on the given requirements.

### Steps

1. **Remove Orders Placed Between 12am and 5am**
   - Filter out rows where `order_date` is between 12:00 AM and 5:00 AM (inclusive).

2. **Convert `order_date` from Timestamp to Date**
   - Change the `order_date` column to only include the date part, removing the time.

3. **Create `time_of_day` Column**
   - Add a new column `time_of_day` based on the `order_date` time:
     - "morning" for orders placed between 5:00 AM and 12:00 PM (exclusive).
     - "afternoon" for orders placed between 12:00 PM and 6:00 PM (exclusive).
     - "evening" for orders placed between 6:00 PM and 12:00 AM (exclusive).

4. **Remove Rows Containing "TV" in `product`**
   - Filter out rows where the `product` column contains "TV".

5. **Ensure All Values in `product` and `category` are Lowercase**
   - Convert all values in the `product` and `category` columns to lowercase.

6. **Create `purchase_state` Column**
   - Extract the state from the `purchase_address` column and create a new column `purchase_state` with this value.

### Sequence
1. Filter out orders placed between 12am and 5am.
2. Convert `order_date` from timestamp to date.
3. Create the `time_of_day` column.
4. Remove rows containing "TV" in the `product` column.
5. Convert all values in `product` and `category` columns to lowercase.
6. Extract the state from `purchase_address` and create the `purchase_state` column.

1. **Import necessary modules from PySpark**:
    - `SparkSession`: This is used to create a Spark session, which is the entry point to programming with Spark.
    - `types`: This module contains various data types that can be used to define the schema of a DataFrame.
    - `functions as F`: This provides a variety of functions that can be used for DataFrame operations, such as filtering, aggregating, and transforming data.

2. **Create a Spark session**:
    - `SparkSession.builder.appName('cleaning_orders_dataset_with_pyspark')`: This sets the name of the Spark application to 'cleaning_orders_dataset_with_pyspark'.
    - `.getOrCreate()`: This initializes the Spark session. If a session already exists, it returns the existing one; otherwise, it creates a new session.

In [45]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

1. **Read the Parquet file into a DataFrame**:
    - `spark.read.parquet('orders_data.parquet')`: This reads the Parquet file named 'orders_data.parquet' into a Spark DataFrame called `orders_data`.

2. **Convert the DataFrame to a Pandas DataFrame and display the first few rows**:
    - `orders_data.toPandas()`: This converts the Spark DataFrame `orders_data` into a Pandas DataFrame.
    - `.head()`: This displays the first few rows of the Pandas DataFrame.

In [46]:
orders_data = spark.read.parquet('orders_data.parquet')
orders_data.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995


This code block performs the following steps:

1. **Create a new column `time_of_day`**:
    - The new column categorizes the time of day based on the `order_date` timestamp.
    - It uses the `hour` function to extract the hour from the `order_date` column and assigns a time of day category based on the hour.

2. **Time of day categories**:
    - **Night**: From 0 to 5 hours.
    - **Morning**: From 6 to 11 hours.
    - **Afternoon**: From 12 to 17 hours.
    - **Evening**: From 18 to 23 hours.

3. **Otherwise statement**:
    - If none of the conditions are met, the `time_of_day` column will be set to `None`.

In [47]:
# DATA CLEANING AND PREPROCESSING
orders_data = (
    orders_data
    # Create a new column time_of_day
    .withColumn(
        'time_of_day',
        # When/otherwise (similar to case/when/else) statements extracting hour from timestamp
        F.when((F.hour('order_date') >= 0) & (F.hour('order_date') <= 5), 'night')
         .when((F.hour('order_date') >= 6) & (F.hour('order_date') <= 11), 'morning')
         .when((F.hour('order_date') >= 12) & (F.hour('order_date') <= 17), 'afternoon')
         .when((F.hour('order_date') >= 18) & (F.hour('order_date') <= 23), 'evening')
        # You can keep the otherwise statement as None to validate whether the conditions are exhaustive
         .otherwise(None)
    )
)

In [48]:
orders_data.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin,time_of_day
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0,evening
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475,afternoon
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99,afternoon
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965,evening
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995,morning


In [49]:
# Filter by time of day
orders_data = orders_data.filter(F.col('time_of_day') != 'night')

In [50]:
orders_data.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin,time_of_day
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0,evening
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475,afternoon
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99,afternoon
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965,evening
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995,morning


In this code block, we are converting the `order_date` column from a timestamp to a date. This is useful when we only need the date part of the timestamp and not the time. 

Here's a step-by-step breakdown:

1. **Select the `order_date` column**: We use `F.col('order_date')` to select the `order_date` column from the DataFrame.
2. **Cast to DateType**: We use the `.cast(types.DateType())` method to convert the `order_date` column from a timestamp to a date.
3. **Update the DataFrame**: We use the `.withColumn` method to update the `order_date` column in the `orders_data` DataFrame with the new date-only values.

The final DataFrame will have the `order_date` column as a date type instead of a timestamp.

In [51]:
# Cast order_date to date as it is originally a timestamp
orders_data=orders_data.withColumn(
        'order_date',
        F.col('order_date').cast(types.DateType())
    )

In [52]:
orders_data.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin,time_of_day
0,2023-01-22,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0,evening
1,2023-01-28,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475,afternoon
2,2023-01-17,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99,afternoon
3,2023-01-05,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965,evening
4,2023-01-25,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995,morning


This code block performs the following operations on the `orders_data` DataFrame:

1. Converts the values in the 'product' column to lowercase.
2. Converts the values in the 'category' column to lowercase.

The `withColumn` function is used to create or replace columns in the DataFrame. The `F.lower` function is applied to the specified columns to transform their values to lowercase.

In [53]:
orders_data = (
    orders_data
    # Make product and category columns lowercase
    .withColumn(
        'product',
        F.lower(F.col('product'))
    )
    .withColumn(
        'category',
        F.lower(F.col('category'))
    )
)

In [54]:
orders_data.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin,time_of_day
0,2023-01-22,141234,iphone,5638009000000.0,vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0,evening
1,2023-01-28,141235,lightning charging cable,5563320000000.0,alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475,afternoon
2,2023-01-17,141236,wired headphones,2113973000000.0,vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99,afternoon
3,2023-01-05,141237,27in fhd monitor,3069157000000.0,sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965,evening
4,2023-01-25,141238,wired headphones,9692681000000.0,électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995,morning


This code removes all rows from the `orders_data` DataFrame where the `product` column contains the substring "tv". The `~` symbol is used to negate the condition, meaning it keeps only the rows where the `product` column does not contain "tv". The `F.col('product').contains('tv')` function checks if the `product` column contains the substring "tv". Since the `product` column has already been converted to lowercase, this check is case-insensitive.

In [55]:
    # Remove rows where product column contains "tv" (as we have already made it lowercase)
    orders_data=orders_data.filter(
        ~F.col('product').contains('tv')
    )

In [56]:
orders_data.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin,time_of_day
0,2023-01-22,141234,iphone,5638009000000.0,vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0,evening
1,2023-01-28,141235,lightning charging cable,5563320000000.0,alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475,afternoon
2,2023-01-17,141236,wired headphones,2113973000000.0,vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99,afternoon
3,2023-01-05,141237,27in fhd monitor,3069157000000.0,sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965,evening
4,2023-01-25,141238,wired headphones,9692681000000.0,électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995,morning


This code block processes the `orders_data` DataFrame to extract the state abbreviation from the `purchase_address` column. Here's a step-by-step explanation:

1. **Split the Address**: The `purchase_address` is split into a list of words using spaces as the delimiter.
2. **Extract State Abbreviation**: The state abbreviation is always the second last item in the split list. This is extracted and stored in a new column called `purchase_state`.
3. **Clean Up**: The temporary `address_split` column is dropped as it is no longer needed.

The resulting DataFrame will have a new column `purchase_state` containing the state abbreviations extracted from the `purchase_address`.

In [57]:
orders_data = (
    orders_data
    # First we split the purchase address by space (" ")
    .withColumn(
        'address_split',
        F.split('purchase_address', ' ')
    )
    # If we look at the address lines, we can see that the state abbreviation is always at the 2nd last position
    .withColumn(
        'purchase_state',
        F.col('address_split').getItem(F.size('address_split') - 2)
    )
    # Dropping address_split columns as it is a temporary technical column
    .drop('address_split')
)

In [58]:
orders_data.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin,time_of_day,purchase_state
0,2023-01-22,141234,iphone,5638009000000.0,vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0,evening,MA
1,2023-01-28,141235,lightning charging cable,5563320000000.0,alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475,afternoon,OR
2,2023-01-17,141236,wired headphones,2113973000000.0,vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99,afternoon,CA
3,2023-01-05,141237,27in fhd monitor,3069157000000.0,sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965,evening,CA
4,2023-01-25,141238,wired headphones,9692681000000.0,électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995,morning,TX


This code calculates the number of unique states in the `purchase_state` column of the `orders_data` DataFrame.

1. **Select the `purchase_state` column**: The code first selects the `purchase_state` column from the `orders_data` DataFrame.

2. **Get distinct values**: It then retrieves the distinct values from the `purchase_state` column.

3. **Count the unique values**: The code counts the number of unique values in the `purchase_state` column.

4. **Store the result in `n_states`**: Finally, the result, which is the count of unique states, is stored in the variable `n_states`.

In summary, this code finds the number of different states in the `purchase_state` column.

In [59]:
# we used distinct and count to calculate the number of unique values
n_states = (
    orders_data
    .select('purchase_state')
    .distinct()
    .count()
)

In [60]:
print(n_states)

8


This code block writes the `orders_data` DataFrame to a Parquet file named `orders_data_clean.parquet`. The `mode='overwrite'` parameter ensures that if a file with the same name already exists, it will be replaced.

In [61]:
orders_data.write.parquet(
    'orders_data_clean.parquet',
    mode='overwrite'
)

                                                                                