<a href="https://colab.research.google.com/github/Parissai/ML-DS-Playground/blob/main/cleaning_dataset_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Cleaning an Orders Dataset with PySpark

| column | data type | description | cleaning requirements |
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F,
)

# Initiate a Spark session
spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

In [None]:
orders_data = spark.read.parquet('/content/drive/MyDrive/data/orders_data.parquet')
orders_data.toPandas().head()

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995


In [None]:
# DATA CLEANING AND PREPROCESSING

orders_data = (
    orders_data
    # Create a new column time_of_day
    .withColumn(
        'time_of_day',
        # When/otherwise (similar to case/when/else) statements extracting hour from timestamp
        F.when((F.hour('order_date') >= 0) & (F.hour('order_date') <= 5), 'night')
         .when((F.hour('order_date') >= 6) & (F.hour('order_date') <= 11), 'morning')
         .when((F.hour('order_date') >= 12) & (F.hour('order_date') <= 17), 'afternoon')
         .when((F.hour('order_date') >= 18) & (F.hour('order_date') <= 23), 'evening')
        # Keep the otherwise statement as None to validate whether the conditions are exhaustive
         .otherwise(None)
    )
    # Filter by time of day
    .filter(
        F.col('time_of_day') != 'night'
    )
    # Cast order_date to date as it is originally a timestamp
    .withColumn(
        'order_date',
        F.col('order_date').cast(types.DateType())
    )
)

In [None]:
orders_data = (
    orders_data
    # Make product and category columns lowercase
    .withColumn(
        'product',
        F.lower('product')
    )
    .withColumn(
        'category',
        F.lower('category')
    )
    # Remove rows where product column contains "tv" (as already made it lowercase)
    .filter(
        ~F.col('product').contains('tv')
    )
)

In [None]:
orders_data = (
    orders_data
    # Split the purchase address by space (" ")
    .withColumn(
        'address_split',
        F.split('purchase_address', ' ')
    )
    # The state abbreviation is always at the 2nd last position
    .withColumn(
        'purchase_state',
        F.col('address_split')[F.size('address_split') - 2]
    )
    # Dropping address_split columns as it is a temporary technical column
    .drop('address_split')
)



In [None]:
# Use distinct and count to calculate the number of unique values
n_states = (
    orders_data
    .select('purchase_state')
    .distinct()
    .count()
)

In [None]:
# EXPORT

# Export the resulting table to parquet format with the new name
(
    orders_data
    .write
    .parquet(
        'orders_data_clean.parquet',
        mode='overwrite',
    )
)