In [1]:
!apt-get update
!apt-get install openjdk-11-jdk -y
!pip install pyspark

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,369 kB]
Get:8 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,668 kB]
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:11 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,682 kB]
Get:12 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:13 https://ppa.launchpadcontent.ne

In [2]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.11/dist-packages/pyspark"

First thing, we need to get or create a spark session


The Spark Session is the entry point for high-level Spark functionality. It is initiated using the Sparksession.builder

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import DateType

In [4]:
spk = SparkSession.builder.master("local[*]").appName('globalSales').getOrCreate()

In [5]:
print(spk.version)

3.5.5


Then we read in the csv file using the spark.read.csv

In [6]:
global_df = spk.read.csv("global_sales_records.csv", header=True,inferSchema=True)

global_df.show(5)

+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|    Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Middle East and N...|          Azerbaijan|       Snacks|       Online|             C| 10/8/2014|535113847|10/23/2014|       934|    152.58|    97.44|    142509.72|  91008.96|    51500.76|
|Central America a...|              Panama|    Cosmetics|      Offline|             L| 2/22/2015|874708545| 2/27/2015|      4551|     437.2|   263.33|    1989697.2|1198414.83|   791282.37|
|  Sub-Saharan Africa|Sao Tome and Prin...|       Fruit

In [13]:
global_df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Sales_Channel: string (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Order_ID: integer (nullable = true)
 |-- Ship_Date: date (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Unit_Cost: double (nullable = true)
 |-- Total_Revenue: double (nullable = true)
 |-- Total_Cost: double (nullable = true)
 |-- Total_Profit: double (nullable = true)



In [8]:
# Rename the columns, to remove the spaces
def rename_columns_remove_spaces(df: DataFrame) -> DataFrame:
    """Renames columns in a PySpark DataFrame to replace spaces with underscores."""
    for col_name in df.columns:
        new_col_name = col_name.replace(" ", "_")
        df = df.withColumnRenamed(col_name, new_col_name)
    return df

global_df = rename_columns_remove_spaces(global_df)

In [37]:
global_df.show(50)

+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|      Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Middle East and N...|          Azerbaijan|         Snacks|       Online|             C|2014-10-08|535113847|2014-10-23|       934|    152.58|    97.44|    142509.72|  91008.96|    51500.76|
|Central America a...|              Panama|      Cosmetics|      Offline|             L|2015-02-22|874708545|2015-02-27|      4551|     437.2|   263.33|    1989697.2|1198414.83|   791282.37|
|  Sub-Saharan Africa|Sao Tome and Prin...|  

In [10]:
# Data Transformation
# Transform the Order and Ship Date

global_df = global_df.withColumn("Order_Date", F.to_date(F.col("Order_Date"), "M/d/yyy"))



In [12]:
# Assignment Transform the Ship Date

global_df = global_df.withColumn("Ship_Date", F.to_date(F.col("Ship_Date"), "M/d/yyy"))

## Top Performing Regions By Revenue

In [15]:
top_region_df = global_df.groupBy("Region").agg(F.sum("Total_Revenue").alias("Revenue_Generated"))

In [23]:
# Sort by total revenue in descending order and take the top 1 region
top_region = top_region_df.orderBy(F.col("Revenue_Generated").asc()).limit(3)

top_region = top_region.withColumn("Revenue_Generated", F.format_number(F.col("Revenue_Generated"), 2))


In [24]:
top_region.show()

+--------------------+-----------------+
|              Region|Revenue_Generated|
+--------------------+-----------------+
|       North America| 2,937,002,333.49|
|Australia and Oce...|10,701,522,223.73|
|Central America a...|14,553,730,165.29|
+--------------------+-----------------+



In [25]:
# Group by 'region' and calculate the average revenue for each region
average_revenue_df = global_df.groupBy("Region").agg(F.avg("Total_Revenue").alias("average_revenue"))

# Sort by average revenue in descending order
top_avg_region = average_revenue_df.orderBy(F.col("average_revenue").desc()).limit(3)

In [26]:
top_avg_region.show()

+--------------------+------------------+
|              Region|   average_revenue|
+--------------------+------------------+
|       North America|1376934.9899156122|
|Central America a...|1356232.4261755676|
|Middle East and N...|1345104.3556852147|
+--------------------+------------------+



## Underperfroming Countries By Total_Profit

In [38]:
# Define a threshold for underperforming
profit_threshold = 50000

# Filter countries with profits below the threshold
underperforming_countries_df = global_df.filter(F.col("Total_Profit") < profit_threshold)

# Sort by Total_Profit in ascending order (lowest first)
underperforming_countries_df = underperforming_countries_df.orderBy(F.col("Total_Profit").asc())

underperforming_countries_df.show(5)

+------------------+-----------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|            Region|    Country|Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+------------------+-----------+---------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|            Europe|     Poland|   Fruits|       Online|             C|2011-04-18|658217370|2011-05-19|         2|      9.33|     6.92|        18.66|     13.84|        4.82|
|Sub-Saharan Africa|    Burundi|   Fruits|       Online|             M|2016-08-12|493579518|2016-09-01|         3|      9.33|     6.92|        27.99|     20.76|        7.23|
|              Asia|North Korea|   Fruits|      Offline|             M|2016-06-03|572485387|2016-06-08|         3|      9.33|     

## Sales Channel Revenue Comparison (Online vs OPffline)

In [34]:
# Group by 'Sales_Channel' and sum the 'Revenue' for each channel
sales_channel_revenue_df = global_df.groupBy("Sales_Channel").agg(F.sum("Total_Revenue").alias("Revenue_Generated"))

# Format the 'Revenue_Generated' to 2 decimal places
sales_channel_revenue_df = sales_channel_revenue_df.withColumn("Revenue_Generated", F.format_number(F.col("Revenue_Generated"), 2))

sales_channel_revenue_df.show()

+-------------+-----------------+
|Sales_Channel|Revenue_Generated|
+-------------+-----------------+
|       Online|66,856,341,348.55|
|      Offline|66,750,331,717.86|
+-------------+-----------------+

