This notebook holds the solutions for the given objectives:  
⚡Which region has the highest profit, and which country within that region has the highest profit?  
⚡What is the highest number of orders placed within a 72-hour period?  
⚡Which month has the highest average number of units sold?  
⚡On analyzing the financial performance of placed orders. Identify the lowest and highest margin percentages from this analysis?  
⚡On identifying key sales trends d etermine the top 3 highest-value orders for a specific item type from the dataset?  
⚡Which type of item is most frequently sold on weekends in the Sub-Saharan Africa region?  
⚡Compute year-over-year growth for total revenue.  

Here is the data dictionary for the same:  
__Region:__ The broad geographical region of the sale (e.g., “Europe”, “Sub-Saharan Africa”).  
__Country:__ The specific country within the region where the sale occurred.  
__Item Type__: The category of the product sold (e.g., “Baby Food”, “Office Supplies”).  
__Sales Channel__: The method of sale, either “Online” or “Offline”.  
__Order Priority__: The priority level assigned to the order (e.g., “H”, “C”, “L”).  
__Order Date__: The date the order was placed.  
__Order ID__: A unique identifier for the order.  
__Ship Date__: The date the order was shipped.  
__Units Sold__: The quantity of the product sold.  
__Unit Price__: The selling price per unit of the product.  
__Unit Cost__: The cost price per unit of the product.  
__Total Revenue__: The total revenue generated from the sale (Units Sold * Unit Price).  
__Total Cost__: The total cost incurred for the sale (Units Sold * Unit Cost).  
__Total Profit__: The total profit made from the sale (Total Revenue — Total Cost).  

In [51]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("ShippingDataAnalysis") \
        .getOrCreate()
        
df = spark.read.csv("AmazonSalesData.csv", header = True, inferSchema = True)

In [52]:
df.show(10)
df.printSchema()

+--------------------+--------------------+---------------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|      Item Type|Sales Channel|Order Priority|Order Date| Order ID|Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+--------------------+---------------+-------------+--------------+----------+---------+---------+----------+----------+---------+-------------+----------+------------+
|Australia and Oce...|              Tuvalu|      Baby Food|      Offline|             H| 5/28/2010|669165933|6/27/2010|      9925|    255.28|   159.42|    2533654.0| 1582243.5|    951410.5|
|Central America a...|             Grenada|         Cereal|       Online|             C| 8/22/2012|963881480|9/15/2012|      2804|     205.7|   117.11|     576782.8| 328376.44|   248406.36|
|              Europe|              Russia|Office 

In [53]:
# Remove spaces and trim column names
df = df.toDF(*[col_name.replace(" ", "").strip() for col_name in df.columns])

print(df.columns)

['Region', 'Country', 'ItemType', 'SalesChannel', 'OrderPriority', 'OrderDate', 'OrderID', 'ShipDate', 'UnitsSold', 'UnitPrice', 'UnitCost', 'TotalRevenue', 'TotalCost', 'TotalProfit']


In [54]:
from pyspark.sql.functions import to_date, col, coalesce, when

df = df.withColumn(
    "OrderDate",
    when(col("OrderDate").rlike(r"^\d{1}/\d{1,2}/\d{4}$"), to_date(col("OrderDate"), "M/d/yyyy"))
    .when(col("OrderDate").rlike(r"^\d{2}/\d{1,2}/\d{4}$"), to_date(col("OrderDate"), "MM/d/yyyy"))
    .otherwise(None)
).withColumn(
    "ShipDate",
    when(col("ShipDate").rlike(r"^\d{1}/\d{1,2}/\d{4}$"), to_date(col("ShipDate"), "M/d/yyyy"))
    .when(col("ShipDate").rlike(r"^\d{2}/\d{1,2}/\d{4}$"), to_date(col("ShipDate"), "MM/d/yyyy"))
    .otherwise(None)
)
df.show(5)
df.printSchema()

+--------------------+--------------------+---------------+------------+-------------+----------+---------+----------+---------+---------+--------+------------+----------+-----------+
|              Region|             Country|       ItemType|SalesChannel|OrderPriority| OrderDate|  OrderID|  ShipDate|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|
+--------------------+--------------------+---------------+------------+-------------+----------+---------+----------+---------+---------+--------+------------+----------+-----------+
|Australia and Oce...|              Tuvalu|      Baby Food|     Offline|            H|2010-05-28|669165933|2010-06-27|     9925|   255.28|  159.42|   2533654.0| 1582243.5|   951410.5|
|Central America a...|             Grenada|         Cereal|      Online|            C|2012-08-22|963881480|2012-09-15|     2804|    205.7|  117.11|    576782.8| 328376.44|  248406.36|
|              Europe|              Russia|Office Supplies|     Offline|        

---

Which region has the highest profit, and which country within that region has the highest profit?

In [59]:
from pyspark.sql.functions import format_number, col, sum

# Group by region and sum profits
region_profit = df.groupBy("Region") \
                    .agg(sum("TotalProfit").alias("total_profit")) \
                    .orderBy(col("total_profit").desc())

# Show top region with formatted number
region_profit.withColumn("total_profit", format_number(col("total_profit"), 2)) \
                .show(1, truncate=False)

+------------------+-------------+
|Region            |total_profit |
+------------------+-------------+
|Sub-Saharan Africa|12,183,211.40|
+------------------+-------------+
only showing top 1 row


In [61]:
# Get the top region name
top_region = region_profit.first()["Region"]

# Find the top country within that region
df.filter(col("Region") == top_region) \
    .groupBy("Country") \
    .agg(sum("TotalProfit").alias("total_profit")) \
    .orderBy(col("total_profit").desc()) \
    .withColumn("total_profit", format_number(col("total_profit"), 2)) \
    .show(1, truncate=False)

+--------+------------+
|Country |total_profit|
+--------+------------+
|Djibouti|2,425,317.87|
+--------+------------+
only showing top 1 row


In [58]:
from pyspark.sql.functions import col, sum

# Group the data by region and calculate the total profit for each region
region_and_country_with_profit = df.groupBy(col('Region')).sum('TotalProfit')

# Order the regions by total profit in descending order and get the region with the highest profit
region_with_highest_profit = region_and_country_with_profit.orderBy("sum(TotalProfit)", ascending=False).first()

# Print the region with the highest profit and its total profit
print(f"{region_with_highest_profit['Region']} Region has the highest profit with a total profit of {region_with_highest_profit['sum(TotalProfit)']}")

# Filter the data to include only the rows from the region with the highest profit
# Group the data by country within that region and calculate the total profit for each country
# Order the countries by total profit in descending order and get the country with the highest profit
country_with_highest_profit = df.where(col('Region') == region_with_highest_profit['Region']) \
                                .groupBy(col('Country')).sum('TotalProfit') \
                                .orderBy("sum(TotalProfit)", ascending=False).first()

# Print the country with the highest profit within the region with the highest profit and its total profit
print(f"{country_with_highest_profit['Country']} country, under region {region_with_highest_profit['Region']} has the highest profit with a total profit of {country_with_highest_profit['sum(TotalProfit)']}")

#Output : 12183211.400000004, 2425317.87

Sub-Saharan Africa Region has the highest profit with a total profit of 12183211.400000004
Djibouti country, under region Sub-Saharan Africa has the highest profit with a total profit of 2425317.87


---

What is the highest number of orders placed within a 72-hour period?

In [62]:
df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- ItemType: string (nullable = true)
 |-- SalesChannel: string (nullable = true)
 |-- OrderPriority: string (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- OrderID: integer (nullable = true)
 |-- ShipDate: date (nullable = true)
 |-- UnitsSold: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- UnitCost: double (nullable = true)
 |-- TotalRevenue: double (nullable = true)
 |-- TotalCost: double (nullable = true)
 |-- TotalProfit: double (nullable = true)



In [63]:
from pyspark.sql.functions import window, count, col

order = df.groupBy(window(col("OrderDate"), '72 hours')) \
            .agg(count("OrderID").alias("count_order_id")) \
            .orderBy('count_order_id', ascending = False)
            
# Print the highest number of orders placed within any 72-hour period
print(f"Highest number of orders ordered under a 72 hr period is {order.first()[1]}")

Highest number of orders ordered under a 72 hr period is 2


---


Which month has the highest average number of units sold?

In [65]:
df.printSchema()
df.show(5)

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- ItemType: string (nullable = true)
 |-- SalesChannel: string (nullable = true)
 |-- OrderPriority: string (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- OrderID: integer (nullable = true)
 |-- ShipDate: date (nullable = true)
 |-- UnitsSold: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- UnitCost: double (nullable = true)
 |-- TotalRevenue: double (nullable = true)
 |-- TotalCost: double (nullable = true)
 |-- TotalProfit: double (nullable = true)

+--------------------+--------------------+---------------+------------+-------------+----------+---------+----------+---------+---------+--------+------------+----------+-----------+
|              Region|             Country|       ItemType|SalesChannel|OrderPriority| OrderDate|  OrderID|  ShipDate|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|
+--------------------+--------------------+---------

In [None]:
from pyspark.sql.functions import date_format, avg, col

month_order = df.groupBy(date_format(col("OrderDate"), "MMMM").alias("month_name")) \
                .agg(avg(col('UnitsSold')).alias('avg_units_sold')) \
                .orderBy('avg_units_sold', ascending=False) \
                .first()

# Print the month with the highest average number of units sold
print(f"The month with the highest average number of units sold is {month_order['month_name']}")

The month with the highest average of units sold is July


---

On analyzing the financial performance of placed orders. Identify the lowest and highest margin percentages from this analysis?

In [70]:
df.show(5)
df.printSchema()

+--------------------+--------------------+---------------+------------+-------------+----------+---------+----------+---------+---------+--------+------------+----------+-----------+
|              Region|             Country|       ItemType|SalesChannel|OrderPriority| OrderDate|  OrderID|  ShipDate|UnitsSold|UnitPrice|UnitCost|TotalRevenue| TotalCost|TotalProfit|
+--------------------+--------------------+---------------+------------+-------------+----------+---------+----------+---------+---------+--------+------------+----------+-----------+
|Australia and Oce...|              Tuvalu|      Baby Food|     Offline|            H|2010-05-28|669165933|2010-06-27|     9925|   255.28|  159.42|   2533654.0| 1582243.5|   951410.5|
|Central America a...|             Grenada|         Cereal|      Online|            C|2012-08-22|963881480|2012-09-15|     2804|    205.7|  117.11|    576782.8| 328376.44|  248406.36|
|              Europe|              Russia|Office Supplies|     Offline|        

In [73]:
from pyspark.sql.functions import col, round

# Rename the columns 'Unit Price' to 'SP' (Selling Price) and 'Unit Cost' to 'CP' (Cost Price)
margin = df.withColumnsRenamed({'UnitPrice':'SP','UnitCost':'CP'}) \
            .select('OrderID', 'SP', 'CP')

# Calculate the margin percentage for each order item and round it to the nearest integer
margin_percentage = margin.withColumn("Percentage", round(((col('SP') - col('CP')) / col('SP')) * 100))

# Find the order item with the lowest margin percentage
lowest_margin_percentage = margin_percentage.orderBy('Percentage', ascending=True).first()[3]

# Find the order item with the highest margin percentage
highest_margin_percentage = margin_percentage.orderBy('Percentage', ascending=False).first()[3]

# Print the lowest and highest margin percentages
print(f"Order Item with the lowest margin percentage is: {int(lowest_margin_percentage)}%")
print(f"Order Item with the highest margin percentage is: {int(highest_margin_percentage)}%")


Order Item with the lowest margin percentage is: 14%
Order Item with the highest margin percentage is: 67%


---

On identifying key sales trends determine the top 3 highest-value orders for a specific item type from the dataset?

In [78]:
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

# Create a window specification to partition the data by 'Item Type' and order by 'Total Profit' in descending order
window_spec = Window.partitionBy(col('ItemType')).orderBy(col('TotalProfit').desc())

# Add a row number column to the DataFrame based on the window specification
highest_valuable_orders = df.withColumn("row_id", row_number().over(window_spec)) \
                            .where(col('row_id').isin(1, 2, 3)) \
                            .drop('row_id')

# Display the top 3 highest-value orders for each item type
highest_valuable_orders.show(highest_valuable_orders.count(), truncate=False)

+---------------------------------+------------------------------+---------------+------------+-------------+----------+---------+----------+---------+---------+--------+------------+----------+-----------+
|Region                           |Country                       |ItemType       |SalesChannel|OrderPriority|OrderDate |OrderID  |ShipDate  |UnitsSold|UnitPrice|UnitCost|TotalRevenue|TotalCost |TotalProfit|
+---------------------------------+------------------------------+---------------+------------+-------------+----------+---------+----------+---------+---------+--------+------------+----------+-----------+
|Australia and Oceania            |Tuvalu                        |Baby Food      |Offline     |H            |2010-05-28|669165933|2010-06-27|9925     |255.28   |159.42  |2533654.0   |1582243.5 |951410.5   |
|Europe                           |Monaco                        |Baby Food      |Offline     |H            |2012-05-29|688288152|2012-06-02|8614     |255.28   |159.42  |21

---

Which type of item is most frequently sold on weekends in the Sub-Saharan Africa region?

In [83]:
from pyspark.sql.functions import sum, col, dayofweek

frequently_sold = df \
    .where((col("Region") == "Sub-Saharan Africa") & (dayofweek(col("OrderDate")).isin([7, 1]))) \
    .groupBy("ItemType") \
    .agg(sum("UnitsSold").alias("Sum of Units Sold")) \
    .orderBy(col("Sum of Units Sold").desc())

frequently_sold.first()


Row(ItemType='Personal Care', Sum of Units Sold=13796)

---

Compute year-over-year growth for total revenue.

In [None]:
from pyspark.sql.functions import year, sum, lag, round, concat, lit
from pyspark.sql.window import Window

# Aggregate revenue by year
revenue_by_year = df.groupBy(year(col("OrderDate")).alias("Year")) \
    .agg(sum("TotalRevenue").alias("Yearly Revenue"))

# Calculate year-over-year growth
windowSpec = Window.orderBy("Year")
revenue_with_growth = revenue_by_year.withColumn(
    "YoY Growth %",
    concat(round(((col("Yearly Revenue") - lag("Yearly Revenue", 1).over(windowSpec)) / lag("Yearly Revenue", 1).over(windowSpec))*100).cast("int").cast("string"),lit(" %"))
)

revenue_with_growth.show()

PySparkAttributeError: [ATTRIBUTE_NOT_SUPPORTED] Attribute `display` is not supported.