# Woltmart 2023 orders cleanup + 
PySpark + Pandas

Woltmart is a popular e-commerce chain that sells electronics, and this notebook contains code that can be used to clean the dataset and build some basic EDA.

Note: The graphs in here are static, but they were interactive in the original versions 

## `orders_data.parquet`

| column | data type | description | cleaning requirements | 
|--------|-----------|-------------|-----------------------|
| `order_date` | `timestamp` | Date and time when the order was made | _Modify: Remove orders placed between 12am and 5am (inclusive); convert from timestamp to date_ |
| `time_of_day` | `string` | Period of the day when the order was made | _New column containing (lower bound inclusive, upper bound exclusive): "morning" for orders placed 5-12am, "afternoon" for orders placed 12-6pm, and "evening" for 6-12pm_ |
| `order_id` | `long` | Order ID | _N/A_ |
| `product` | `string` | Name of a product ordered | _Remove rows containing "TV" as the company has stopped selling this product; ensure all values are lowercase_ |
| `product_ean` | `double` | Product ID | _N/A_ |
| `category` | `string` | Broader category of a product | _Ensure all values are lowercase_ |
| `purchase_address` | `string` | Address line where the order was made ("House Street, City, State Zipcode") | _N/A_ |
| `purchase_state` | `string` | US State of the purchase address | _New column containing: the State that the purchase was ordered from_ |
| `quantity_ordered` | `long` | Number of product units ordered | _N/A_ |
| `price_each` | `double` | Price of a product unit | _N/A_ |
| `cost_price` | `double` | Cost of production per product unit | _N/A_ |
| `turnover` | `double` | Total amount paid for a product (quantity x price) | _N/A_ |
| `margin` | `double` | Profit made by selling a product (turnover - cost) | _N/A_ |

<br>

In [1]:
from pyspark.sql import (
    SparkSession,
    types,
    functions as F
)

spark = (
    SparkSession
    .builder
    .appName('cleaning_orders_dataset_with_pyspark')
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/08/20 08:58:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
orders_data = spark.read.parquet('orders_data.parquet')

orders_data.toPandas().head()

                                                                                

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin
0,2023-01-22 21:25:00,141234,iPhone,5638009000000.0,Vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0
1,2023-01-28 14:15:00,141235,Lightning Charging Cable,5563320000000.0,Alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475
2,2023-01-17 13:33:00,141236,Wired Headphones,2113973000000.0,Vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99
3,2023-01-05 20:33:00,141237,27in FHD Monitor,3069157000000.0,Sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965
4,2023-01-25 11:59:00,141238,Wired Headphones,9692681000000.0,Électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995


In [3]:
# Changing the date format
from pyspark.sql.functions import to_date, col, hour, when, lower, regexp_extract, udf, sum, avg, date_format
from pyspark.sql.types import StructType, StructField, DoubleType, StringType
from pyspark.sql.window import Window
import plotly.graph_objs as go
import pandas as pd
import plotly.express as px
#Pattern for extracting the state name
pattern = r', ([A-Z]{2})'
#Removing the orders that were placed at night
clean_data = (orders_data.filter((hour(col("order_date")) > 5))
     .withColumn(
        "time_of_day",
        when((hour(col("order_date")) >= 5) & (hour(col("order_date")) < 12), "morning")
        .when((hour(col("order_date")) >= 12) & (hour(col("order_date")) < 18), "afternoon")
        .otherwise("evening"))         
    #Creating a clean date format
    .withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))
    #Removing misplaced orders that contain 'TV' (this store does not sell TVs)
    .filter(~col("product").like("%TV%"))
    .withColumn("product", lower(col("product")))
    .withColumn("weekday", date_format(col("order_date"), "EEEE"))
    .withColumn("category", lower(col("category")))
    .withColumn("purchase_state", regexp_extract(col("purchase_address"), pattern, 1)))

clean_data.toPandas().head()


                                                                                

Unnamed: 0,order_date,order_id,product,product_id,category,purchase_address,quantity_ordered,price_each,cost_price,turnover,margin,time_of_day,weekday,purchase_state
0,2023-01-22,141234,iphone,5638009000000.0,vêtements,"944 Walnut St, Boston, MA 02215",1,700.0,231.0,700.0,469.0,evening,Sunday,MA
1,2023-01-28,141235,lightning charging cable,5563320000000.0,alimentation,"185 Maple St, Portland, OR 97035",1,14.95,7.475,14.95,7.475,afternoon,Saturday,OR
2,2023-01-17,141236,wired headphones,2113973000000.0,vêtements,"538 Adams St, San Francisco, CA 94016",2,11.99,5.995,23.98,11.99,afternoon,Tuesday,CA
3,2023-01-05,141237,27in fhd monitor,3069157000000.0,sports,"738 10th St, Los Angeles, CA 90001",1,149.99,97.4935,149.99,52.4965,evening,Thursday,CA
4,2023-01-25,141238,wired headphones,9692681000000.0,électronique,"387 10th St, Austin, TX 73301",1,11.99,5.995,11.99,5.995,morning,Wednesday,TX


In [7]:
#Finding the long/lat coordinates of each address
import requests
def geocode_address(address):
    url = "https://nominatim.openstreetmap.org/search"
    params = {
        'q': address,
        'format': 'json',
        'limit': 1
    }
    response = requests.get(url, params=params)
    
    if response.status_code == 200 and response.json():
        data = response.json()[0]
        return float(data['lat']), float(data['lon'])
    else:
        return None, None

schema = StructType([StructField("latitude", DoubleType(), True), StructField("longitude", DoubleType(), True)])
geocode_udf = udf(geocode_address, schema)
geo_df = orders_data.limit(10).withColumn("coordinates", geocode_udf(orders_data["purchase_address"]))

In [22]:
#Graphing total sum of goods 

#Calculating percentiles for transformation 
quantiles = selling_by_state.approxQuantile("total_sales", [0.33, 0.66], 0.0)
low_threshold = quantiles[0]
medium_threshold = quantiles[1]

selling_by_state = clean_data.groupBy("purchase_state").agg(F.round(sum("turnover"), 2).alias("total_sales"),
                                                           avg("margin").alias("avg_margin"))


#Finding most popular product in each state
selling_by_state_pd = (selling_by_state.withColumn(
    "selling_power",
    when(col("total_sales") <= low_threshold, "low sales")
    .when((col("total_sales") > low_threshold) & (col("total_sales") <= medium_threshold), "medium sales")
    .otherwise("high sales"))).toPandas()

top_products = clean_data.groupBy("purchase_state", "product").agg(F.count("order_id").alias("count"))
#Calculating the rank
window_spec = Window.partitionBy("purchase_state").orderBy(col("count").desc())

ranked_df = top_products.withColumn("rank", F.rank().over(window_spec))

# Filter the top-ranked products for each state (allows ties)
top_products = ranked_df.filter(col("rank") == 1).toPandas()

selling_by_state_pd.head(10)


Unnamed: 0,purchase_state,total_sales,avg_margin,selling_power
0,OR,1689758.74,113.871601,low sales
1,CA,12419970.13,112.681142,high sales
2,ME,412216.72,111.941612,low sales
3,WA,2484171.58,113.223746,low sales
4,NY,4210027.23,114.226134,high sales
5,TX,4158915.53,113.182825,medium sales
6,GA,2502138.09,113.810168,medium sales
7,MA,3304599.64,111.603556,medium sales


In [27]:
size_mapping = {
    "low sales": 5,
    "medium sales": 10,
    "high sales": 15
}

# Map sizes to the selling_power categories
selling_by_state_pd['size'] = selling_by_state_pd['selling_power'].map(size_mapping)

# Create the base choropleth map using Plotly Express
fig = px.choropleth(
    selling_by_state_pd,
    locations='purchase_state',
    locationmode='USA-states',
    color='avg_margin',
    scope="usa",
    color_continuous_scale="Plasma",
    labels={'avg_margin': 'Average Margin'}
)

# Add a scatter plot layer using go.Scattergeo for the actual data points with size based on selling_power
fig.add_trace(go.Scattergeo(
    locationmode='USA-states',
    locations=selling_by_state_pd['purchase_state'],
    text=selling_by_state_pd['total_sales'].astype(str),
    marker=dict(
        size=selling_by_state_pd['size'],  # Use the sizes based on selling_power
        color='red',  # Set the color of the circles to red
        line=dict(color='black', width=1),  # Black outline
    ),
    showlegend=True,
    name='Total Sales'
))

# Add custom legend for circle sizes
for label, size in size_mapping.items():
    fig.add_trace(go.Scattergeo(
        locationmode='USA-states',
        locations=["TX"],  # Dummy location for the legend
        marker=dict(
            size=size,
            color='red',
            line=dict(color='black', width=1)
        ),
        showlegend=True,
        name=label
    ))

# Update the layout for better visualization
fig.update_layout(
    title_text="Average Margin by State with Total Sales Overlay",
    showlegend=True,
    legend_title_text="Selling Power",
    legend=dict(
        title="Selling Power",
        x=0,  # Align to the far left
        y=0,  # Align to the bottom
        xanchor='left',  # Anchor the x position to the left
        yanchor='bottom',  # Anchor the y position to the bottom
        orientation='v'  # Vertical orientation
    ),
    geo=dict(
        scope='usa',
        projection=go.layout.geo.Projection(type='albers usa'),
        showland=True,
        landcolor="rgb(217, 217, 217)",
    )
)

# Show the map
fig.show()

In [107]:
fig = go.Figure()

fig = px.bar(
    top_products,
    x="purchase_state",
    y="count",
    color="product",  # Color bars by product
    title="Most sold product in each state",
    labels={"orders_number": "Number of Orders", "purchase_state": "State of Purchase", "product": "Product Name"}
)

# Update layout for better visualization
fig.update_layout(
    xaxis_title="Purchase State",
    yaxis_title="Number of Orders",
    showlegend=True
)
# Update the layout to add black outlines
fig.update_traces(
    marker_line_color='black',  
    marker_line_width=1.5 
)

# Show the plot
fig.show()

In [149]:
# Create the histogram using Plotly Express
quantity_df = clean_data.select('quantity_ordered').toPandas()

fig = px.histogram(
    quantity_df,
    x="quantity_ordered",
    nbins=10,  # Number of bins in the histogram
    title="Distribution of Quantity Ordered",
    labels={"quantity_ordered": "Quantity Ordered"}
)

# Customize the layout (optional)
fig.update_layout(
    xaxis_title="Quantity Ordered",
    yaxis_title="Count",
    bargap=0.2,
    bargroupgap=0.1
)

# Show the plot
fig.show()

In [67]:
#Seeing how margins changed over time for each product

grouped_data = (clean_data
       .withColumn("m_year", date_format(col("order_date"), "MM-yyyy"))
        .groupBy("m_year", "product").agg(avg("margin").alias("avg_margin"),
              avg("price_each").alias("avg_cost")))

# Convert the PySpark DataFrame to a Pandas DataFrame for plotting
df_pandas = grouped_data.toPandas()

# Create the line graph using Plotly Express
fig = px.line(
    df_pandas,
    x="m_year",
    y="avg_margin",
    color="product",
    title="Average Margins Over Time"
)

# Update the layout with a dropdown on the right and remove the legend
fig.update_layout(
    updatemenus=[
        {
            "buttons": [
                {
                    "method": "update",
                    "label": product,
                    "args": [
                        {"visible": [p == product for p in df_pandas['product'].unique()]},
                        {"title": f"Average Margins Over Time for {product}"}
                    ],
                } for product in df_pandas['product'].unique()
            ],
            "direction": "down",
            "showactive": True,
            "x": 1.15,  # Move the dropdown to the right side
            "y": 1.15,  # Move the dropdown up to the top
            "xanchor": "right",  # Anchor the dropdown to the right
            "yanchor": "top"  # Anchor the dropdown to the top
        }
    ],
    showlegend=False,
    yaxis_title = "Average Margin ($)"
)

# Show the plot
fig.show()


### Check that there is almost no variation in the average prices for each product

In [69]:
fig = px.line(
    df_pandas,
    x="m_year",
    y="avg_cost",
    color="product",
    title="Average Cost Over Time by Product",
    labels={"order_date": "Order Date", "avg_cost": "Average Cost per Month"}
)

# Add a dropdown for selecting the product
fig.update_layout(
    updatemenus=[
        {
            "buttons": [
                {
                    "method": "update",
                    "label": product,
                    "args": [
                        {"visible": [p == product for p in df_pandas['product'].unique()]},
                        {"title": f"Average Cost Over Time for {product}"}
                    ],
                } for product in df_pandas['product'].unique()
            ],
            "direction": "down",
            "showactive": True,
            "x": 1.15,  # Move the dropdown to the right side
            "y": 1.15,  # Move the dropdown up to the top
            "xanchor": "right",  # Anchor the dropdown to the right
            "yanchor": "top"  # Anchor the dropdown to the top
        }
    ],
    showlegend=False,
    yaxis_title = "Average Price ($)"
)


# Show the plot
fig.show()

In [58]:
#Daily analysis

daily_orders = (clean_data.groupBy("weekday").agg(F.count("order_id").alias("orders_number"))).toPandas()

daily_orders.head(10)


fig = px.bar(
    daily_orders,
    x="weekday",
    y="orders_number",
    title="Number of Orders by Day of the Week",
    labels={"weekday": "Day of the Week", "orders_number": "Number of Orders"},
     color_discrete_sequence=["lightblue"],
    text="orders_number"  # Annotate bars with the number of orders
)

# Customize the layout (optional)
fig.update_layout(
    xaxis_title="Weekday",
    yaxis_title="Number of Orders",
    xaxis=dict(categoryorder='array', categoryarray= ['Monday','Tuesday','Wednesday','Thursday','Friday','Saturday','Sunday']),
    uniformtext_minsize=8, uniformtext_mode='hide',  # Ensure text fits within the bars
)

# Show the plot
fig.show()

In [15]:
# Save the DataFrame as a Parquet file
clean_data.write.mode("overwrite").parquet("geo_data_clean.parquet")
geo_df.write.mode("overwrite").parquet("orders_data_clean.parquet")

                                                                                