In [1]:
!pip install pyspark py4j

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=c858e3c7c4943bafcbd4b0f6822141b3b8bfb2adbeec5af260f5770f003fbd32
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()


In [4]:
#1 read our data - lives in a csv file

df = spark.read.option("header","true").csv("/content/Sample - EU Superstore.csv")

In [17]:
df.printSchema()

root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: string (nullable = true)



In [None]:
#2 how many rows of the EU Superstore dataset have the country being France
df.filter(df['Country'] == 'France').count()


In [None]:
#3 of those, how many are profitable?
df_profit = df.filter((df['Country'] == 'France') & (df['Profit']> 0)).count()
df_profit

In [5]:
#4 how many different discount brackets exist? what are they?
df_discount = df.groupby('Discount').count().show()

+--------+-----+
|Discount|count|
+--------+-----+
|     0.3|   51|
|     0.7|    6|
|       0| 6134|
|     0.2|  125|
|    0.15|  407|
|    0.35|   45|
|     0.8|    4|
|    0.45|    2|
|     0.5| 1080|
|    0.65|   17|
|     0.6|  116|
|     0.1| 1737|
|    0.85|    2|
|     0.4|  274|
+--------+-----+



In [6]:
#5 let's see the total profit by discount bracket, make sure they are ordered by 
df1 = df.groupby('Discount').agg({'Profit': 'sum'}).orderBy('sum(Profit)', descending = True)
df2=df1.withColumnRenamed('sum(Profit)' , 'total_profit')
df2.show()
#we see that the higher the discount the less profit we get from the sales 

+--------+-------------------+
|Discount|       total_profit|
+--------+-------------------+
|     0.5|         -96632.115|
|     0.4|-21346.427999999996|
|     0.6|-20517.456000000002|
|    0.35|          -9122.649|
|    0.65| -6221.965499999999|
|     0.7|          -5496.765|
|    0.85|          -3068.658|
|    0.45|         -1103.1915|
|     0.3| -758.4209999999999|
|     0.8|           -460.284|
|     0.2| 2189.5499999999984|
|    0.15| 24677.563499999975|
|     0.1|  126884.0309999999|
|       0| 383806.53000000026|
+--------+-------------------+



In [7]:
#6 what is the value after which we should stop offering discount?
from pyspark.sql.functions import when
df_stop = df2.withColumn('discount_offering', when((df2['total_profit'] >=0),'profitable').otherwise('stop'))
df_stop.show()
#we should stop at 0.8

+--------+-------------------+-----------------+
|Discount|       total_profit|discount_offering|
+--------+-------------------+-----------------+
|     0.5|         -96632.115|             stop|
|     0.4|-21346.427999999996|             stop|
|     0.6|-20517.456000000002|             stop|
|    0.35|          -9122.649|             stop|
|    0.65| -6221.965499999999|             stop|
|     0.7|          -5496.765|             stop|
|    0.85|          -3068.658|             stop|
|    0.45|         -1103.1915|             stop|
|     0.3| -758.4209999999999|             stop|
|     0.8|           -460.284|             stop|
|     0.2| 2189.5499999999984|       profitable|
|    0.15| 24677.563499999975|       profitable|
|     0.1|  126884.0309999999|       profitable|
|       0| 383806.53000000026|       profitable|
+--------+-------------------+-----------------+



In [8]:
#7.# who are the top 5 most profitable customers
df_customers = df.groupby('Customer Name',).agg({'Profit': 'sum'}).orderBy('sum(Profit)', ascending = False)
df_customers.show(5)

+-----------------+------------------+
|    Customer Name|       sum(Profit)|
+-----------------+------------------+
|     Susan Pistek| 4974.512999999999|
|    Patrick Jones|3986.0039999999995|
|Patrick O'Donnell|          3778.197|
|    Ellis Ballard|           3459.66|
|  Mike Gockenbach|3144.4439999999995|
+-----------------+------------------+
only showing top 5 rows



In [9]:
#8 get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?
df_customers.collect()[0:5]

[Row(Customer Name='Susan Pistek', sum(Profit)=4974.512999999999),
 Row(Customer Name='Patrick Jones', sum(Profit)=3986.0039999999995),
 Row(Customer Name="Patrick O'Donnell", sum(Profit)=3778.197),
 Row(Customer Name='Ellis Ballard', sum(Profit)=3459.66),
 Row(Customer Name='Mike Gockenbach', sum(Profit)=3144.4439999999995)]

In [21]:
#9 create a new column which is the value of the sale where there are no discount applied. Hint: orginal = sales/(1-d)
df5 = df.withColumn('sales_original', df.Sales/(1-(df.Discount)))
df5.select('Sales', 'sales_original').show()

+--------+------------------+
|   Sales|    sales_original|
+--------+------------------+
|    79.2|              79.2|
|  388.92|            388.92|
|   35.19|             35.19|
|   50.94|             50.94|
|  307.44|            307.44|
|   122.4|             122.4|
|  413.82|            413.82|
|  428.22|            428.22|
| 3979.29|           3979.29|
|   43.56|             43.56|
|   25.26|             25.26|
|2443.905|2715.4500000000003|
|   12.21|             12.21|
|2167.296|2549.7599999999998|
| 138.105|            153.45|
| 128.385|142.64999999999998|
|  690.12|            690.12|
|    8.16|              8.16|
|  347.88|            347.88|
| 575.505| 639.4499999999999|
+--------+------------------+
only showing top 20 rows



In [23]:
#10 calculate the difference between sales and discount value
df_diff = df5.withColumn('Sales-discount_diff', (df5.sales_original)-df5.Sales)
df_diff.select('Sales','Discount','Sales-discount_diff').show()

+--------+--------+-------------------+
|   Sales|Discount|Sales-discount_diff|
+--------+--------+-------------------+
|    79.2|       0|                0.0|
|  388.92|       0|                0.0|
|   35.19|       0|                0.0|
|   50.94|       0|                0.0|
|  307.44|       0|                0.0|
|   122.4|       0|                0.0|
|  413.82|       0|                0.0|
|  428.22|       0|                0.0|
| 3979.29|       0|                0.0|
|   43.56|       0|                0.0|
|   25.26|       0|                0.0|
|2443.905|     0.1|  271.5450000000001|
|   12.21|       0|                0.0|
|2167.296|    0.15| 382.46399999999994|
| 138.105|     0.1| 15.344999999999999|
| 128.385|     0.1| 14.264999999999986|
|  690.12|       0|                0.0|
|    8.16|       0|                0.0|
|  347.88|       0|                0.0|
| 575.505|     0.1| 63.944999999999936|
+--------+--------+-------------------+
only showing top 20 rows



In [24]:
#11 how much money did we not gain due to the discounts - per discount bracket?
df3= df_diff.groupby('Discount').agg({'Sales-discount_diff':'sum'})
df3.show(5)
#164727 amount of money was not gained due to discount 

+--------+------------------------+
|Discount|sum(Sales-discount_diff)|
+--------+------------------------+
|     0.3|      2630.2410000000004|
|     0.7|                8534.085|
|       0|                     0.0|
|     0.2|      10653.119999999997|
|    0.15|       45233.17650000002|
+--------+------------------------+
only showing top 5 rows



In [25]:
#12 find the discount bracket which made us not gain the most (dynamically)
data = df3.orderBy('sum(Sales-discount_diff)' , ascending=False)
data.collect()[0]

Row(Discount='0.5', sum(Sales-discount_diff)=183734.26500000045)

In [27]:
#13 what would have been the total profit if we removed all orders from that discount group? 
df2 = df.filter( df['Discount']!=0.5)
df2.select('Discount').distinct().show()
from pyspark.sql.functions import sum
df2.select(sum( df['Profit'])).collect()[0][0]

+--------+
|Discount|
+--------+
|     0.3|
|     0.7|
|       0|
|     0.2|
|    0.15|
|    0.35|
|     0.8|
|    0.45|
|    0.65|
|     0.6|
|     0.1|
|    0.85|
|     0.4|
+--------+



469461.8565000003

In [28]:
#14 how much more (or less) profit is that?
df.select( sum(df["Profit"])).collect()[0][0] - df2.select( sum(df["Profit"])).collect()[0][0] 

-96632.11499999976

In [29]:
#15 create a temporary table for our superstore table in sql
df.createOrReplaceTempView('table')
dfSQL = spark.sql('SELECT * FROM table')
dfSQL.show(2)

+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID| Customer Name|  Segment| City|  State|       Country|Region|     Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|
+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|Aaron Smayling|Corporate|Leeds|England|United Kingdom| North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|  79.2|       3|       0|  39.6|
|     2|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-1

In [30]:
#16 use an SQL query to count the number of rows
dfSQL.count()

10000

In [31]:
#17 Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
data = spark.sql(" SELECT Country, SUM(Profit), SUM(Sales) , SUM(Profit)/SUM(Sales) as Profit_Ratio FROM table GROUP BY Country")
data.show()

+--------------+-------------------+------------------+--------------------+
|       Country|        sum(Profit)|        sum(Sales)|        Profit_Ratio|
+--------------+-------------------+------------------+--------------------+
|        Sweden|-17519.366999999987|         30491.403| -0.5745674280714466|
|       Germany| 107322.82049999991| 628840.0305000001| 0.17066792076621765|
|        France| 109029.00299999975|  858931.082999999| 0.12693568221933804|
|       Belgium|           11572.59| 49226.70000000003| 0.23508766583987942|
|       Finland|            3905.73|20704.350000000002| 0.18864296633316185|
|         Italy| 19828.757999999965|289709.65799999936| 0.06844355185424991|
|        Norway|            5167.77|20525.370000000003|  0.2517747548521659|
|         Spain|  54390.11999999999| 287146.6800000002| 0.18941580658358978|
|       Denmark|-4282.0470000000005| 8638.053000000002| -0.4957190005664471|
|       Ireland| -7392.381000000003|16639.508999999995|-0.44426677493909256|

In [None]:
#18 is the country with the largest profit ratio, the country with the largest profit? No, its not 
data = spark.sql(" SELECT Country, SUM(Profit), SUM(Sales) , SUM(Profit)/SUM(Sales) as Profit_Ratio FROM table GROUP BY Country ORDER BY Profit_Ratio DESC")
data.show()

+--------------+-------------------+------------------+--------------------+
|       Country|        sum(Profit)|        sum(Sales)|        Profit_Ratio|
+--------------+-------------------+------------------+--------------------+
|   Switzerland|  7237.470000000001|24877.860000000004|  0.2909201193350232|
|       Austria|           21442.26| 81162.00000000006|  0.2641908775042505|
|        Norway|            5167.77|20525.370000000003|  0.2517747548521659|
|       Belgium|           11572.59| 49226.70000000003| 0.23508766583987942|
|United Kingdom| 111900.15000000001| 528576.2999999992| 0.21170103540397134|
|         Spain|  54390.11999999999| 287146.6800000002| 0.18941580658358978|
|       Finland|            3905.73|20704.350000000002| 0.18864296633316185|
|       Germany| 107322.82049999991| 628840.0305000001| 0.17066792076621765|
|        France| 109029.00299999975|  858931.082999999| 0.12693568221933804|
|         Italy| 19828.757999999965|289709.65799999936| 0.06844355185424991|