**Setting Up Pyspark**

In [1]:
!apt-get update
!apt-get install openjdk-11-jdk -y
!pip install pyspark

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:5 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,659 kB]
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:12 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [8,914 kB]
Get:13 http://archive.u

In [4]:
import os

os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'
os.environ['SPARK_HOME'] = '/usr/local/lib/python3.7/dist-packages/pyspark'

**Working with Pyspark**

First we need to get or create a spark session

In [1]:
import sys
print(sys.version)

3.11.12 (main, Apr  9 2025, 08:55:54) [GCC 11.4.0]


In [9]:
!pip uninstall pyspark

[0m

In [10]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.5.tar.gz (317.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.2/317.2 MB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.5-py2.py3-none-any.whl size=317747862 sha256=7729efb6b7cc4b470577d191386a4c55ce1d3d36f8e48919ef2b555797d3a8b2
  Stored in directory: /root/.cache/pip/wheels/0c/7f/b4/0e68c6d8d89d2e582e5498ad88616c16d7c19028680e9d3840
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.5


In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"  # Ensure this is correct for your Colab
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.11/dist-packages/pyspark"

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate()

print("SparkSession created successfully!")

SparkSession created successfully!


In [3]:
from pyspark.sql.functions import col, sum, avg, count, year, month, dayofweek, when, isnan, isnull

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import DateType

In [5]:
from pyspark.sql import SparkSession

spk = SparkSession.builder.master("local[*]").appName("globalSales").getOrCreate()

print("SparkSession created successfully with the alias 'spk'!")

SparkSession created successfully with the alias 'spk'!


In [6]:
print(spk.version)

3.5.5


Read in the csv file using spark.read.csv

In [50]:
global_df = spk.read.csv("global_sales_records.csv", header=True, inferSchema=True)

global_df.show(5)

+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|    Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Middle East and N...|          Azerbaijan|       Snacks|       Online|             C| 10/8/2014|535113847|10/23/2014|       934|    152.58|    97.44|    142509.72|  91008.96|    51500.76|
|Central America a...|              Panama|    Cosmetics|      Offline|             L| 2/22/2015|874708545| 2/27/2015|      4551|     437.2|   263.33|    1989697.2|1198414.83|   791282.37|
|  Sub-Saharan Africa|Sao Tome and Prin...|       Fruit

In [9]:
# To get more infor about the dataset
print(f"Dataset Shape\nRows: {global_df.count():,}\nColumns: {len(global_df.columns)}")


Dataset Shape
Rows: 100,000
Columns: 14


In [10]:
global_df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)



In [51]:
# Rename the columns, to remove the spaces

def rename_columns_remove_spaces(df: DataFrame) -> DataFrame:
  """Rename columns in a PySpark DataFrame to replace spaces with underscores."""
  for col_name in df.columns:
    new_col_name = col_name.replace(" ", "_")
    df = df.withColumnRenamed(col_name, new_col_name)
  return df

global_df = rename_columns_remove_spaces(global_df)

In [26]:
global_df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)



In [31]:
global_df.show(5)

+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|    Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Middle East and N...|          Azerbaijan|       Snacks|       Online|             C| 10/8/2014|535113847|10/23/2014|       934|    152.58|    97.44|    142509.72|  91008.96|    51500.76|
|Central America a...|              Panama|    Cosmetics|      Offline|             L| 2/22/2015|874708545| 2/27/2015|      4551|     437.2|   263.33|    1989697.2|1198414.83|   791282.37|
|  Sub-Saharan Africa|Sao Tome and Prin...|       Fruit

In [52]:
# Get null values if any
null_counts = global_df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in global_df.columns])
null_counts.show()

+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+
|Region|Country|Item_Type|Sales_Channel|Order_Priority|Order_Date|Order_ID|Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+
|     0|      0|        0|            0|             0|         0|       0|        0|         0|         0|        0|            0|         0|           0|
+------+-------+---------+-------------+--------------+----------+--------+---------+----------+----------+---------+-------------+----------+------------+



In [53]:
# Data Transformation

# Transform the order and ship date
global_df = global_df.withColumn("Order_Date", F.to_date(F.col("Order_Date"), "M/d/yyyy"))

global_df = global_df.withColumn("Ship_Date", F.to_date(F.col("Ship_Date"), "M/d/yyyy"))


In [74]:
global_df.show(40)

+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|      Item_Type|Sales_Channel|Order_Priority|Order_Date| Order_ID| Ship_Date|Units_Sold|Unit_Price|Unit_Cost|Total_Revenue|Total_Cost|Total_Profit|
+--------------------+--------------------+---------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Middle East and N...|          Azerbaijan|         Snacks|       Online|             C|2014-10-08|535113847|2014-10-23|       934|    152.58|    97.44|    142509.72|  91008.96|    51500.76|
|Central America a...|              Panama|      Cosmetics|      Offline|             L|2015-02-22|874708545|2015-02-27|      4551|     437.2|   263.33|    1989697.2|1198414.83|   791282.37|
|  Sub-Saharan Africa|Sao Tome and Prin...|  

In [55]:
global_df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Sales_Channel: string (nullable = true)
 |-- Order_Priority: string (nullable = true)
 |-- Order_Date: date (nullable = true)
 |-- Order_ID: integer (nullable = true)
 |-- Ship_Date: date (nullable = true)
 |-- Units_Sold: integer (nullable = true)
 |-- Unit_Price: double (nullable = true)
 |-- Unit_Cost: double (nullable = true)
 |-- Total_Revenue: double (nullable = true)
 |-- Total_Cost: double (nullable = true)
 |-- Total_Profit: double (nullable = true)



In [71]:
# Top-performing regions by revenue
region_perf_rev = global_df.groupBy("Region").agg(F.sum("Total_Revenue") \
                                                  .alias("Revenue_Generated")) \
                                                  .orderBy(F.desc("Revenue_Generated"))

region_perf_rev = region_perf_rev.withColumn("Revenue_Generated", F.format_number(F.col("Revenue_Generated"), 2))
region_perf_rev.show()

+--------------------+-----------------+
|              Region|Revenue_Generated|
+--------------------+-----------------+
|  Sub-Saharan Africa|34,958,453,406.17|
|              Europe|34,241,150,923.39|
|                Asia|19,293,401,219.82|
|Middle East and N...|16,921,412,794.52|
|Central America a...|14,553,730,165.29|
|Australia and Oce...|10,701,522,223.73|
|       North America| 2,937,002,333.49|
+--------------------+-----------------+



In [72]:
rg_pd = region_perf_rev.toPandas()
rg_pd

Unnamed: 0,Region,Revenue_Generated
0,Sub-Saharan Africa,34958453406.17
1,Europe,34241150923.39
2,Asia,19293401219.82
3,Middle East and North Africa,16921412794.52
4,Central America and the Caribbean,14553730165.29
5,Australia and Oceania,10701522223.73
6,North America,2937002333.49


In [77]:
# Underperforming countries by profit
country_perf_profit = global_df.groupBy("Country").agg(F.sum("Total_Profit") \
                                                       .alias("Profit_Generated")) \
                                                       .orderBy(F.asc("Profit_Generated"))

country_perf_profit = country_perf_profit.withColumn("Profit_Generated", F.format_number(F.col("Profit_Generated"), 2))
country_perf_profit.show()

+--------------------+----------------+
|             Country|Profit_Generated|
+--------------------+----------------+
|              Angola|  178,155,609.15|
|     Solomon Islands|  185,828,653.42|
|               Palau|  185,972,902.81|
|          Kazakhstan|  187,443,367.34|
|             Germany|  188,026,206.60|
|         Switzerland|  188,855,603.18|
|            Kiribati|  189,363,092.17|
|             Nigeria|  189,724,068.82|
|        Sierra Leone|  190,921,135.72|
|             Jamaica|  191,539,645.86|
|United States of ...|  192,749,977.24|
|             Georgia|  193,276,418.20|
|             Vietnam|  194,177,410.45|
|          Kyrgyzstan|  194,211,180.74|
|             Andorra|  195,942,934.91|
|         Netherlands|  196,098,273.33|
|Antigua and Barbuda |  196,563,869.12|
|             Iceland|  196,844,401.69|
|               Italy|  197,617,559.77|
|            Moldova |  197,954,823.08|
+--------------------+----------------+
only showing top 20 rows



In [80]:
cp_pd = country_perf_profit.toPandas()
cp_pd

Unnamed: 0,Country,Profit_Generated
0,Angola,178155609.15
1,Solomon Islands,185828653.42
2,Palau,185972902.81
3,Kazakhstan,187443367.34
4,Germany,188026206.60
...,...,...
180,Australia,238598455.81
181,Liberia,239114337.44
182,Federated States of Micronesia,241045260.27
183,Hungary,241860844.00


In [82]:
# sales channel comparison (online vs. offline) by Revenue
channel_perf_rev = global_df.groupBy("Sales_Channel").agg(F.sum("Total_Revenue") \
                                                          .alias("Revenue_Generated")) \
                                                          .orderBy(F.desc("Revenue_Generated"))
channel_perf_rev = channel_perf_rev.withColumn("Revenue_Generated", F.format_number(F.col("Revenue_Generated"), 2))
channel_perf_rev.show()

+-------------+-----------------+
|Sales_Channel|Revenue_Generated|
+-------------+-----------------+
|       Online|66,856,341,348.55|
|      Offline|66,750,331,717.86|
+-------------+-----------------+



In [83]:
cv_pd = channel_perf_rev.toPandas()
cv_pd

Unnamed: 0,Sales_Channel,Revenue_Generated
0,Online,66856341348.55
1,Offline,66750331717.86
