In [1]:
import pyspark
import pyspark.sql.functions as func
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

In [2]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

In [3]:
spark

In [4]:
# read our data - lives in a csv file

df = spark.read.option("header","true").csv("Sample - EU Superstore.csv")

In [5]:
df.show(5)

+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID| Customer Name|  Segment| City|  State|       Country|Region|     Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|
+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|Aaron Smayling|Corporate|Leeds|England|United Kingdom| North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|  79.2|       3|       0|  39.6|
|     2|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-1

In [6]:
df.columns

['Row ID',
 'Order ID',
 'Order Date',
 'Ship Date',
 'Ship Mode',
 'Customer ID',
 'Customer Name',
 'Segment',
 'City',
 'State',
 'Country',
 'Region',
 'Product ID',
 'Category',
 'Sub-Category',
 'Product Name',
 'Sales',
 'Quantity',
 'Discount',
 'Profit']

In [7]:
# how many rows of the EU Superstore dataset have the country being France
df.filter(df["Country"] == "France").count()

2827

In [8]:
# of those, how many are profitable
df.filter(df["Profit"] > 0).count()


7569

In [9]:
# how any different discount brackets exist? what are they?
discount = df.select(df["Discount"]).distinct()
print(discount.count())
discount.show()

14
+--------+
|Discount|
+--------+
|     0.3|
|     0.7|
|       0|
|     0.2|
|    0.15|
|    0.35|
|     0.8|
|    0.45|
|     0.5|
|    0.65|
|     0.6|
|     0.1|
|    0.85|
|     0.4|
+--------+



In [10]:
# let's see the total profit by discount bracket, make sure they are ordered by 
discounts = df.groupBy("Discount").agg({"Profit" : "sum"}).orderBy("Discount")
discounts.show()

+--------+-------------------+
|Discount|        sum(Profit)|
+--------+-------------------+
|       0| 383806.53000000026|
|     0.1|  126884.0309999999|
|    0.15| 24677.563499999975|
|     0.2| 2189.5499999999984|
|     0.3| -758.4209999999999|
|    0.35|          -9122.649|
|     0.4|-21346.427999999996|
|    0.45|         -1103.1915|
|     0.5|         -96632.115|
|     0.6|-20517.456000000002|
|    0.65| -6221.965499999999|
|     0.7|          -5496.765|
|     0.8|           -460.284|
|    0.85|          -3068.658|
+--------+-------------------+



In [11]:
# what is the value after which we should stop offering discount?

discounts.filter(discounts["sum(Profit)"] < 0).show()

+--------+-------------------+
|Discount|        sum(Profit)|
+--------+-------------------+
|     0.3| -758.4209999999999|
|    0.35|          -9122.649|
|     0.4|-21346.427999999996|
|    0.45|         -1103.1915|
|     0.5|         -96632.115|
|     0.6|-20517.456000000002|
|    0.65| -6221.965499999999|
|     0.7|          -5496.765|
|     0.8|           -460.284|
|    0.85|          -3068.658|
+--------+-------------------+



In [12]:
# who are the top 5 most profitable customers

df2 = df.groupBy("Customer ID").agg({"Profit" : "sum"}).orderBy("sum(Profit)", ascending = False)
df2.show(5)

+-----------+------------------+
|Customer ID|       sum(Profit)|
+-----------+------------------+
|   SP-20920| 4974.512999999999|
|   PJ-18835|3986.0039999999995|
|   PO-18865|          3778.197|
|   EB-13840|           3459.66|
|   MG-18145|3144.4439999999995|
+-----------+------------------+
only showing top 5 rows



In [13]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?
top_5_customers = df2.collect()[:5]
top_5_customers

[Row(Customer ID='SP-20920', sum(Profit)=4974.512999999999),
 Row(Customer ID='PJ-18835', sum(Profit)=3986.0039999999995),
 Row(Customer ID='PO-18865', sum(Profit)=3778.197),
 Row(Customer ID='EB-13840', sum(Profit)=3459.66),
 Row(Customer ID='MG-18145', sum(Profit)=3144.4439999999995)]

In [14]:
top_5_customers_ID = [row[0] for row in top_5_customers]
df2 = df.filter(df["Customer ID"].isin(top_5_customers_ID))

In [15]:
df2.count()

76

In [16]:
df2.groupBy("Customer ID").count().show()

+-----------+-----+
|Customer ID|count|
+-----------+-----+
|   MG-18145|    8|
|   PJ-18835|    5|
|   PO-18865|   16|
|   SP-20920|   30|
|   EB-13840|   17|
+-----------+-----+



In [17]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)

In [18]:
df = df.withColumn("Original sales value", df["Sales"]/(1-df["discount"]))


In [19]:
df.select("Sales", "Discount", "Original sales value").show()

+--------+--------+--------------------+
|   Sales|Discount|Original sales value|
+--------+--------+--------------------+
|    79.2|       0|                79.2|
|  388.92|       0|              388.92|
|   35.19|       0|               35.19|
|   50.94|       0|               50.94|
|  307.44|       0|              307.44|
|   122.4|       0|               122.4|
|  413.82|       0|              413.82|
|  428.22|       0|              428.22|
| 3979.29|       0|             3979.29|
|   43.56|       0|               43.56|
|   25.26|       0|               25.26|
|2443.905|     0.1|  2715.4500000000003|
|   12.21|       0|               12.21|
|2167.296|    0.15|  2549.7599999999998|
| 138.105|     0.1|              153.45|
| 128.385|     0.1|  142.64999999999998|
|  690.12|       0|              690.12|
|    8.16|       0|                8.16|
|  347.88|       0|              347.88|
| 575.505|     0.1|   639.4499999999999|
+--------+--------+--------------------+
only showing top

In [20]:
# calculate the difference between sales and discount value
df = df.withColumn("Sale difference", df["Original sales value"] - df["Sales"])

In [21]:
df.select("Sales", "Discount", "Original sales value", "Sale difference").show()

+--------+--------+--------------------+------------------+
|   Sales|Discount|Original sales value|   Sale difference|
+--------+--------+--------------------+------------------+
|    79.2|       0|                79.2|               0.0|
|  388.92|       0|              388.92|               0.0|
|   35.19|       0|               35.19|               0.0|
|   50.94|       0|               50.94|               0.0|
|  307.44|       0|              307.44|               0.0|
|   122.4|       0|               122.4|               0.0|
|  413.82|       0|              413.82|               0.0|
|  428.22|       0|              428.22|               0.0|
| 3979.29|       0|             3979.29|               0.0|
|   43.56|       0|               43.56|               0.0|
|   25.26|       0|               25.26|               0.0|
|2443.905|     0.1|  2715.4500000000003| 271.5450000000001|
|   12.21|       0|               12.21|               0.0|
|2167.296|    0.15|  2549.7599999999998|

In [22]:
# how much money did we not gain due to the discounts - per discount bracket?
df2 = df.groupBy("Discount").agg({"Sale Difference" : "sum"}).orderBy("Discount")
df2.show()

+--------+--------------------+
|Discount|sum(Sale Difference)|
+--------+--------------------+
|       0|                 0.0|
|     0.1|   84712.44899999995|
|    0.15|   45233.17650000002|
|     0.2|  10653.119999999997|
|     0.3|  2630.2410000000004|
|    0.35|  29163.099000000002|
|     0.4|   46724.68800000002|
|    0.45|  2083.4414999999995|
|     0.5|  183734.26500000045|
|     0.6|   39644.04599999997|
|    0.65|  12219.655499999999|
|     0.7|            8534.085|
|     0.8|   635.6640000000002|
|    0.85|            4515.438|
+--------+--------------------+



In [23]:
# find the discount bracket which made us not gain the most (dynamically)
df2.orderBy("sum(Sale difference)", ascending = False).show(1)


+--------+--------------------+
|Discount|sum(Sale Difference)|
+--------+--------------------+
|     0.5|  183734.26500000045|
+--------+--------------------+
only showing top 1 row



In [24]:
#alternatively.
df.groupBy("Discount").agg({"Profit" : "sum"}).orderBy("sum(Profit)").show(1)

+--------+-----------+
|Discount|sum(Profit)|
+--------+-----------+
|     0.5| -96632.115|
+--------+-----------+
only showing top 1 row



In [25]:
Total_profit = df.agg({"Profit": "sum"})
Total_profit.show()

+-----------------+
|      sum(Profit)|
+-----------------+
|372829.7415000005|
+-----------------+



In [26]:
# what would have been the total profit if we removed all orders from that discount group? 
df3 = df.filter(df["Discount"] != 0.5)
Total_profit_without_highest_discount = df3.agg({"Profit": "sum"})
Total_profit_without_highest_discount.show()

+-----------------+
|      sum(Profit)|
+-----------------+
|469461.8565000003|
+-----------------+



In [27]:
#how much more (or less) profit is that?

Total_profit_without_highest_discount.collect()[0][0] - Total_profit.collect()[0][0]

96632.11499999976

In [28]:
# create a temporary table for our superstore table in sql
df.registerTempTable("df")



In [29]:
# use an SQL query to count the number of rows

spark.sql("select count(*) from df").show()


+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [32]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
spark.sql("select sum(Profit)/sum(Sales) as ratio from df ").show()

+------------------+
|             ratio|
+------------------+
|0.1268953165462097|
+------------------+



In [65]:
# is the country with the largest profit ratio, the country with the largest profit?
spark.sql("select country, sum(Profit), sum(Profit/Sales) from df group by Country").show()

+--------------+-------------------+---------------------+
|       country|        sum(Profit)|sum((Profit / Sales))|
+--------------+-------------------+---------------------+
|        Sweden|-17519.366999999987|  -102.00288339372285|
|       Germany| 107322.82049999991|   382.72996553246355|
|        France| 109029.00299999975|   459.09052906039324|
|       Belgium|           11572.59|    36.43431096913639|
|       Finland|            3905.73|    14.71841983001711|
|         Italy| 19828.757999999965|     85.0870597765804|
|        Norway|            5167.77|   17.521202479057507|
|         Spain|  54390.11999999999|   179.98874046541965|
|       Denmark|-4282.0470000000005|   -34.86752579094944|
|       Ireland| -7392.381000000003|  -52.118361055897715|
|   Switzerland|  7237.470000000001|   17.981956121444878|
|      Portugal| -8703.059999999998|   -36.15228703973136|
|       Austria|           21442.26|    67.60928451576017|
|United Kingdom| 111900.15000000001|   270.1731482722775