<a href="https://colab.research.google.com/github/deryaoruc/Spark_Exercises/blob/main/Pyspark_Exercises.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark
import pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [None]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

In [None]:
spark

In [None]:
# read our data - lives in a csv file

df = spark.read.option("header","true").csv("Sample - EU Superstore.csv")

In [None]:
df.show(5)

+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID| Customer Name|  Segment| City|  State|       Country|Region|     Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|
+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|Aaron Smayling|Corporate|Leeds|England|United Kingdom| North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|  79.2|       3|       0|  39.6|
|     2|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-1

In [None]:
df.printSchema()

root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: string (nullable = true)



In [None]:
# how many rows of the EU Superstore dataset have the country being France
df = df.withColumn("Profit", col("Profit").cast("double"))
df = df.withColumn("Discount", col("Discount").cast("double"))

df_France = df.filter(df["Country"]=="France")
df_France.count()

2827

In [None]:
# of those, how many are profitable?
df_France.filter(df_France["Profit"]>0).count()

2329

In [None]:
# how many different discount brackets exist? what are they?
df.select("Discount").distinct().show()

+--------+
|Discount|
+--------+
|     0.0|
|     0.2|
|     0.7|
|     0.1|
|    0.45|
|     0.6|
|     0.8|
|    0.35|
|     0.5|
|     0.4|
|    0.85|
|    0.15|
|    0.65|
|     0.3|
+--------+



In [None]:
# let's see the total profit by discount bracket, make sure they are ordered by 
from pyspark.sql.functions import desc
df.groupBy("Discount").agg({"Profit": "sum"}).orderBy(desc("sum(Profit)")).show()

+--------+-------------------+
|Discount|        sum(Profit)|
+--------+-------------------+
|     0.0| 383806.53000000026|
|     0.1|  126884.0309999999|
|    0.15| 24677.563499999975|
|     0.2| 2189.5499999999984|
|     0.8|           -460.284|
|     0.3| -758.4209999999999|
|    0.45|         -1103.1915|
|    0.85|          -3068.658|
|     0.7|          -5496.765|
|    0.65| -6221.965499999999|
|    0.35|          -9122.649|
|     0.6|-20517.456000000002|
|     0.4|-21346.427999999996|
|     0.5|         -96632.115|
+--------+-------------------+



In [None]:
# what is the value after which we should stop offering discount?
df.groupBy("Discount").agg({"Profit": "sum"}).orderBy(("Discount")).show()
# We should stop offering at % 0.3

+--------+-------------------+
|Discount|        sum(Profit)|
+--------+-------------------+
|     0.0| 383806.53000000026|
|     0.1|  126884.0309999999|
|    0.15| 24677.563499999975|
|     0.2| 2189.5499999999984|
|     0.3| -758.4209999999999|
|    0.35|          -9122.649|
|     0.4|-21346.427999999996|
|    0.45|         -1103.1915|
|     0.5|         -96632.115|
|     0.6|-20517.456000000002|
|    0.65| -6221.965499999999|
|     0.7|          -5496.765|
|     0.8|           -460.284|
|    0.85|          -3068.658|
+--------+-------------------+



In [None]:
# who are the top 5 most profitable customers

df_customer = df.groupBy("Customer ID").agg({"Profit": "sum"})
df_customer.orderBy(desc("sum(Profit)")).show(5)

+-----------+------------------+
|Customer ID|       sum(Profit)|
+-----------+------------------+
|   SP-20920| 4974.512999999999|
|   PJ-18835|3986.0039999999995|
|   PO-18865|          3778.197|
|   EB-13840|           3459.66|
|   MG-18145|3144.4439999999995|
+-----------+------------------+
only showing top 5 rows



In [None]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?
top_5_customers = df_customer.orderBy(desc("sum(Profit)")).limit(5).select("Customer ID").collect()
top_5_customers_ids = [i[0] for i in top_5_customers]
df_top_5_customers = df.filter(df["Customer ID"].isin(top_5_customers_ids))
df_top_5_customers.show()

+------+---------------+----------+----------+--------------+-----------+-----------------+---------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+-------+--------+--------+-------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|    Customer Name|  Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|  Sales|Quantity|Discount| Profit|
+------+---------------+----------+----------+--------------+-----------+-----------------+---------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+-------+--------+--------+-------+
|     8|ES-2015-5113958|02/08/2015|07/08/2015|  Second Class|   EB-13840|    Ellis Ballard|Corporate|West Bromwich|             England|United Kingdom|  North|TEC-CO-10004325|     Technology|     Copiers|Canon Personal C

In [None]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)
df = df.withColumn("Original Price", df["Sales"]/(1-df["Discount"]))
df.show(5)

+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+--------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID| Customer Name|  Segment| City|  State|       Country|Region|     Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|Original Price|
+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+--------------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|Aaron Smayling|Corporate|Leeds|England|United Kingdom| North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|  79.2|       3|     0.0|  39.6|          79.2|
|     2|

In [None]:
# calculate the difference between sales and discount value
df = df.withColumn("Difference", df["Original Price"] - df["Sales"])
df.show()

+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+------------------+------------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|    Original Price|        Difference|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+------------------+------------------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corporat

In [None]:
# how much money did we not gain due to the discounts - per discount bracket?
df.groupBy("Discount").agg({"Difference": "sum"}).show()

+--------+------------------+
|Discount|   sum(Difference)|
+--------+------------------+
|     0.0|               0.0|
|     0.2|10653.119999999997|
|     0.7|          8534.085|
|     0.1| 84712.44899999995|
|    0.45|2083.4414999999995|
|     0.6| 39644.04599999997|
|     0.8| 635.6640000000002|
|    0.35|29163.099000000002|
|     0.5|183734.26500000045|
|     0.4| 46724.68800000002|
|    0.85|          4515.438|
|    0.15| 45233.17650000002|
|    0.65|12219.655499999999|
|     0.3|2630.2410000000004|
+--------+------------------+



In [None]:
# find the discount bracket which made us not gain the most (dynamically)
from pyspark.sql.functions import max, sum, first

df_temp = df.groupBy("Discount").agg(sum("Difference").alias("Total_Loss"))
df_temp.orderBy(desc("Total_Loss")).limit(1).show()

+--------+------------------+
|Discount|        Total_Loss|
+--------+------------------+
|     0.5|183734.26500000045|
+--------+------------------+



In [None]:
# what would have been the total profit if we removed all orders from that discount group? 

from pyspark.sql.functions import sum

# filter out the rows that have discount 0.5
df_without_discount = df.filter(df["Discount"] != 0.5)

# calculate total profit without discount 0.5
total_profit_without_discount = df_without_discount.agg(sum("Profit")).collect()[0][0]

# calculate total profit
total_profit = df.agg(sum("Profit")).collect()[0][0]

# calculate the difference
difference = total_profit - total_profit_without_discount

print("Total profit without discount 0.5: ", total_profit_without_discount)
print("Total profit: ", total_profit)
print("Difference: ", difference)

Total profit without discount 0.5:  469461.8565000003
Total profit:  372829.7415000005
Difference:  -96632.11499999976


In [None]:
#how much more (or less) profit is that?

Difference:  -96632.11499999976

In [None]:
# create a temporary table for our superstore table in sql
df.createOrReplaceTempView("superstore_temp")

In [None]:
# use an SQL query to count the number of rows
sqlContext.sql("SELECT COUNT(*) FROM superstore_temp").show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [None]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
sqlContext.sql("SELECT Country, SUM(Profit) / SUM(Sales) as Profit_Ratio FROM superstore_temp GROUP BY Country ORDER BY Profit_Ratio DESC").show()

+--------------+--------------------+
|       Country|        Profit_Ratio|
+--------------+--------------------+
|   Switzerland|  0.2909201193350232|
|       Austria|  0.2641908775042505|
|        Norway|  0.2517747548521659|
|       Belgium| 0.23508766583987942|
|United Kingdom| 0.21170103540397134|
|         Spain| 0.18941580658358978|
|       Finland| 0.18864296633316185|
|       Germany| 0.17066792076621765|
|        France| 0.12693568221933804|
|         Italy| 0.06844355185424991|
|       Ireland|-0.44426677493909256|
|       Denmark| -0.4957190005664471|
|   Netherlands| -0.5298342790541865|
|        Sweden| -0.5745674280714466|
|      Portugal| -0.5761662270806188|
+--------------+--------------------+



In [None]:
# is the country with the largest profit ratio, the country with the largest profit?
sqlContext.sql("SELECT Country, SUM(Profit) as Total_Profit FROM superstore_temp GROUP BY Country ORDER BY Total_Profit DESC").show()

""" No, It is not necessarily the case that the country with the largest profit ratio also has the largest profit. 
The profit ratio is calculated by taking the sum of profits and dividing it by the sum of sales for a given country. 
This ratio can be influenced by factors such as the number of customers or the number of sales in that country, 
 even if the profit is relatively low. Therefore, even if a country has a high profit ratio, 
 it may not have the highest profit if the sales or customer count is low. 
In order to determine which country has the largest profit, 
 you would need to calculate the sum of profits for each country separately and compare them. """

+--------------+-------------------+
|       Country|       Total_Profit|
+--------------+-------------------+
|United Kingdom| 111900.15000000001|
|        France| 109029.00299999975|
|       Germany| 107322.82049999991|
|         Spain|  54390.11999999999|
|       Austria|           21442.26|
|         Italy| 19828.757999999965|
|       Belgium|           11572.59|
|   Switzerland|  7237.470000000001|
|        Norway|            5167.77|
|       Finland|            3905.73|
|       Denmark|-4282.0470000000005|
|       Ireland| -7392.381000000003|
|      Portugal| -8703.059999999998|
|        Sweden|-17519.366999999987|
|   Netherlands| -41070.07499999996|
+--------------+-------------------+

