# **Setting up spark**


In [1]:
!apt-get update
!apt-get install openjdk-11-jdk -y
!pip install pyspark

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,369 kB]
Get:12 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [8,730 kB]
Get:13 https://r2u.stat.illinois.edu/ubuntu jammy/

In [9]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.11/dist-packages/pyspark"

# Working with pySpark

First things first, we need to get or create a spark session

# Set a Spark session

The sparksession is the entry point for high-level spark functionality.it is initiated using the Sparksession.builder

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import DateType

In [12]:
spk = SparkSession.builder.master("local[*]").appName('globalSales').getOrCreate()

In [13]:
print(spk.version)

3.5.5


Then we read in the csv file using the spark.read.csv

In [18]:
global_df = spk.read.csv("../global_sales_records.csv", header=True, inferSchema=True)

global_df.show(5)

+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|              Region|             Country|    Item Type|Sales Channel|Order Priority|Order Date| Order ID| Ship Date|Units Sold|Unit Price|Unit Cost|Total Revenue|Total Cost|Total Profit|
+--------------------+--------------------+-------------+-------------+--------------+----------+---------+----------+----------+----------+---------+-------------+----------+------------+
|Middle East and N...|          Azerbaijan|       Snacks|       Online|             C| 10/8/2014|535113847|10/23/2014|       934|    152.58|    97.44|    142509.72|  91008.96|    51500.76|
|Central America a...|              Panama|    Cosmetics|      Offline|             L| 2/22/2015|874708545| 2/27/2015|      4551|     437.2|   263.33|    1989697.2|1198414.83|   791282.37|
|  Sub-Saharan Africa|Sao Tome and Prin...|       Fruit

In [19]:
global_df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)



In [21]:
# Rename the columns, to remove the spaces
def rename_columns_remove_spaces(df: DataFrame) -> DataFrame:
    """Renames columns in a PySpark DataFrame to replace spaces with underscores."""
    for col_name in df.columns:
        new_col_name = col_name.replace(" ", "_")
        df = df.withColumnRenamed(col_name, new_col_name)
    return df


In [22]:
global_df = rename_columns_remove_spaces(global_df)


In [25]:
# Data Transformation
# Transform the order and ship date

global_df.withColumn("Order_Date", F.to_date(F.col("Order_Date"), "M/d/yyyy"))

DataFrame[Region: string, Country: string, Item_Type: string, Sales_Channel: string, Order_Priority: string, Order_Date: date, Order_ID: int, Ship_Date: string, Units_Sold: int, Unit_Price: double, Unit_Cost: double, Total_Revenue: double, Total_Cost: double, Total_Profit: double]

# Top performing regions by revenue

In [29]:
region_perf_rev = global_df.groupBy("Region").agg(F.sum("Total_Revenue") \
                                                  .alias("Revenue_Generated")) \
                                                  .orderBy("Revenue_Generated")

region_perf_rev = region_perf_rev.withColumn("Revenue_Generated", F.format_number(F.col("Revenue_Generated"), 2))
region_perf_rev.show()

+--------------------+-----------------+
|              Region|Revenue_Generated|
+--------------------+-----------------+
|       North America| 2,937,002,333.49|
|Australia and Oce...|10,701,522,223.73|
|Central America a...|14,553,730,165.29|
|Middle East and N...|16,921,412,794.52|
|                Asia|19,293,401,219.82|
|              Europe|34,241,150,923.39|
|  Sub-Saharan Africa|34,958,453,406.17|
+--------------------+-----------------+

