In [1]:
!pip install pyspark 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=f9b1ad13cb5fea4da73353cb1fd74acb2536a90d567aec11dc9d41e26c110ebb
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

In [5]:
spark

In [6]:
# read our data - lives in a csv file

df = spark.read.option("header","true").csv("/content/drive/MyDrive/DevelHope/ETL exercice/Sample - EU Superstore.csv")

In [7]:
# how many rows of the EU Superstore dataset have the country being France


In [8]:
df.filter(df['Country']=='France').count()

2827

In [9]:
# of those, how many are profitable?
df.filter(( df['Country']=='France' ) &  (df['Profit'] > 0)).count()


2277

In [10]:
# how any different discount brackets exist? what are they?
df.select('Discount').distinct().count()

14

In [11]:
df.show(3)

+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID| Customer Name|  Segment| City|  State|       Country|Region|     Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|
+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|Aaron Smayling|Corporate|Leeds|England|United Kingdom| North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|  79.2|       3|       0|  39.6|
|     2|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-1

In [12]:
# let's see the totl profit by discount bracket, make sure they are ordered by 
df.groupBy('Discount').agg({'Profit':'sum'}).orderBy('sum(Profit)').show()

+--------+-------------------+
|Discount|        sum(Profit)|
+--------+-------------------+
|     0.5|         -96632.115|
|     0.4|-21346.427999999996|
|     0.6|-20517.456000000002|
|    0.35|          -9122.649|
|    0.65| -6221.965499999999|
|     0.7|          -5496.765|
|    0.85|          -3068.658|
|    0.45|         -1103.1915|
|     0.3| -758.4209999999999|
|     0.8|           -460.284|
|     0.2| 2189.5499999999984|
|    0.15| 24677.563499999975|
|     0.1|  126884.0309999999|
|       0| 383806.53000000026|
+--------+-------------------+



In [13]:
# what is the value after which we should stop offering discount?
0.2

0.2

In [14]:
# who are the top 5 most profitable customers

In [15]:
customers = df.groupBy("Customer Name").count().orderBy('count', ascending=False)
customers.show(5)

+-------------+-----+
|Customer Name|count|
+-------------+-----+
|Becky Castell|   37|
|   John Grady|   35|
| Mathew Reese|   34|
|Michael Paige|   33|
|  John Murray|   32|
+-------------+-----+
only showing top 5 rows



In [16]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?
top5 = customers.collect()[:5]
names = [name[0] for name in top5]
names

['Becky Castell', 'John Grady', 'Mathew Reese', 'Michael Paige', 'John Murray']

In [17]:
res = df[df['Customer Name'].isin(names)]
res.show()

+------+---------------+----------+----------+--------------+-----------+-------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount|  Profit|
+------+---------------+----------+----------+--------------+-----------+-------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+--------+
|   162|ES-2017-5347900|18/03/2017|18/03/2017|      Same Day|   MP-17965|Michael Paige|  Corporate|         Cork|                Cork|       Ireland|  North|OFF-AR-10001482|Office Supplies|         Art|Stanley Markers, .

In [18]:
res.count()

171

In [19]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)
df = df.withColumn("Original_sales", df['Sales']*(1-df['Discount']))
df.show(4)

+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+--------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID| Customer Name|  Segment| City|  State|       Country|Region|     Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|Original_sales|
+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+--------------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|Aaron Smayling|Corporate|Leeds|England|United Kingdom| North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|  79.2|       3|       0|  39.6|          79.2|
|     2|

In [20]:
# calculate the difference between sales and discount value
df = df.withColumn("diff_Sales", df['Sales'] - df['Original_sales'])
df.show(4)

+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+--------------+----------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID| Customer Name|  Segment| City|  State|       Country|Region|     Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|Original_sales|diff_Sales|
+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+--------------+----------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|Aaron Smayling|Corporate|Leeds|England|United Kingdom| North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|  79.2|       3|       

In [21]:
# how much money did we not gain due to the discounts - per discount bracket?
res = df.groupBy('Discount').agg({"diff_Sales":"sum"})
res.show()

+--------+------------------+
|Discount|   sum(diff_Sales)|
+--------+------------------+
|     0.3|1841.1687000000004|
|     0.7|2560.2254999999996|
|       0|               0.0|
|     0.2|          8522.496|
|    0.15| 38448.20002499999|
|    0.35|18956.014349999998|
|     0.8|          127.1328|
|    0.45|       1145.892825|
|     0.5| 91867.13250000023|
|    0.65|       4276.879425|
|     0.6|15857.618399999994|
|     0.1|        76241.2041|
|    0.85|          677.3157|
|     0.4|28034.812799999996|
+--------+------------------+



In [22]:
# find the discount bracket which made us not gain the most (dynamically)
data = res.orderBy('sum(diff_Sales)' , ascending=False)
data.collect()[0]

Row(Discount='0.5', sum(diff_Sales)=91867.13250000023)

In [23]:
data.collect()[0][0]

'0.5'

In [24]:
# what would have been the total profit if we removed all orders from that discount group? 
df2 = df.filter( df['Discount']!=0.5)
df2.select('Discount').distinct().show()

+--------+
|Discount|
+--------+
|     0.3|
|     0.7|
|       0|
|     0.2|
|    0.15|
|    0.35|
|     0.8|
|    0.45|
|    0.65|
|     0.6|
|     0.1|
|    0.85|
|     0.4|
+--------+



In [25]:
from pyspark.sql.functions import sum
df2.select( sum( df['Profit'])).collect()[0][0]

469461.8565000003

In [26]:
#how much more (or less) profit is that?
df.select( sum(df["Profit"])).collect()[0][0] - df2.select( sum(df["Profit"])).collect()[0][0] 


-96632.11499999976

In [31]:
# create a temporary table for our superstore table in sql
df.createOrReplaceTempView('df')

In [33]:
# use an SQL query to count the number of rows
a = spark.sql("SELECT count(*) FROM df")
a.show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [39]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
data = spark.sql(" SELECT Country, SUM(Profit), SUM(Sales) , SUM(Profit)/SUM(Sales) as Profit_Ratio FROM df GROUP BY Country")
data.show()

+--------------+-------------------+------------------+--------------------+
|       Country|        sum(Profit)|        sum(Sales)|        Profit_Ratio|
+--------------+-------------------+------------------+--------------------+
|        Sweden|-17519.366999999987|         30491.403| -0.5745674280714466|
|       Germany| 107322.82049999991| 628840.0305000001| 0.17066792076621765|
|        France| 109029.00299999975|  858931.082999999| 0.12693568221933804|
|       Belgium|           11572.59| 49226.70000000003| 0.23508766583987942|
|       Finland|            3905.73|20704.350000000002| 0.18864296633316185|
|         Italy| 19828.757999999965|289709.65799999936| 0.06844355185424991|
|        Norway|            5167.77|20525.370000000003|  0.2517747548521659|
|         Spain|  54390.11999999999| 287146.6800000002| 0.18941580658358978|
|       Denmark|-4282.0470000000005| 8638.053000000002| -0.4957190005664471|
|       Ireland| -7392.381000000003|16639.508999999995|-0.44426677493909256|

In [43]:
# is the country with the largest profit ratio, the country with the largest profit?
data.orderBy('sum(Profit)', ascending = False).collect()[0][0]

'United Kingdom'

In [44]:
data.orderBy('Profit_Ratio', ascending = False).collect()[0][0]

'Switzerland'