In [58]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, round, col

In [2]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/10 10:22:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [23]:
# read our data - lives in a csv file

df = spark.read.option("header","true").csv("Sample - EU Superstore.csv")
df.show()
df.printSchema()

+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corporate|        Leeds|             England|United Kingdom|  North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes 

In [28]:
# how many rows of the EU Superstore dataset have the country being France
count_france=df.filter(df['Country']=='France').count()
count_france

2827

In [31]:
# of those, how many are profitable?

profitable_france= df.filter((df['Country']=='France') & (df['Profit']>0)).count()
profitable_france

2277

In [32]:
# how any different discount brackets exist? what are they?
df.select('Discount').distinct().count()

14

In [33]:
df.select('Discount').distinct().orderBy('Discount').show()

+--------+
|Discount|
+--------+
|       0|
|     0.1|
|    0.15|
|     0.2|
|     0.3|
|    0.35|
|     0.4|
|    0.45|
|     0.5|
|     0.6|
|    0.65|
|     0.7|
|     0.8|
|    0.85|
+--------+



In [48]:
# let's see the totl profit by discount bracket, make sure they are ordered by 
total_profit = df.groupBy('Discount').agg(round(sum('Profit'), 2)).withColumnRenamed('round(sum(Profit), 2)', 'Total_profit').orderBy('Discount')
total_profit.show()


+--------+------------+
|Discount|Total_profit|
+--------+------------+
|       0|   383806.53|
|     0.1|   126884.03|
|    0.15|    24677.56|
|     0.2|     2189.55|
|     0.3|     -758.42|
|    0.35|    -9122.65|
|     0.4|   -21346.43|
|    0.45|    -1103.19|
|     0.5|   -96632.12|
|     0.6|   -20517.46|
|    0.65|    -6221.97|
|     0.7|    -5496.77|
|     0.8|     -460.28|
|    0.85|    -3068.66|
+--------+------------+



In [49]:
# what is the value after which we should stop offering discount?
total_profit.createOrReplaceTempView('total_profit')
max_discount=spark.sql('select max(Discount) from total_profit where total_profit >0')
max_discount.show()

+-------------+
|max(Discount)|
+-------------+
|          0.2|
+-------------+



In [52]:
# who are the top 5 most profitable customers

top5_customers = df.groupBy('Customer ID').agg(round(sum('Profit'), 2)).withColumnRenamed('round(sum(Profit), 2)', 'Profit').orderBy('Profit', ascending=False).limit(5)
top5_customers.show()

+-----------+-------+
|Customer ID| Profit|
+-----------+-------+
|   SP-20920|4974.51|
|   PJ-18835| 3986.0|
|   PO-18865| 3778.2|
|   EB-13840|3459.66|
|   MG-18145|3144.44|
+-----------+-------+



In [55]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?

top5cust = top5_customers.collect()
top5_cust_names = [row['Customer ID'] for row in top5cust]

top5_cust_rows= df.filter(df['Customer ID'].isin(top5_cust_names))
top5_cust_rows.show()

+------+---------------+----------+----------+--------------+-----------+-----------------+---------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+-------+--------+--------+-------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|    Customer Name|  Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|  Sales|Quantity|Discount| Profit|
+------+---------------+----------+----------+--------------+-----------+-----------------+---------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+-------+--------+--------+-------+
|     8|ES-2015-5113958|02/08/2015|07/08/2015|  Second Class|   EB-13840|    Ellis Ballard|Corporate|West Bromwich|             England|United Kingdom|  North|TEC-CO-10004325|     Technology|     Copiers|Canon Personal C

In [56]:
top5_cust_rows.count()

76

In [62]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)

df = df.withColumn("No_discount", round(col("Sales") / (1 - col("Discount")),2))
df.show()

+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+-----------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|No_discount|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+-----------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corporate|        Leeds|             England|United Kingdom|  North|OFF-ST-10000988|Of

In [64]:
# calculate the difference between sales and discount value
df = df.withColumn("SalesDiscountDiff", round(col("Sales") - col("No_discount"),2))
df.show()

+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+-----------+-----------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|No_discount|SalesDiscountDiff|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+-----------+-----------------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corporate|        Leeds|        

In [68]:
# how much money did we not gain due to the discounts - per discount bracket?
lost_profit= df.groupBy('Discount').agg(round(sum(df['SalesDiscountDiff']),2)).withColumnRenamed('round(sum(SalesDiscountDiff), 2)','lost_profit').orderBy('Discount')
lost_profit.show()

+--------+-----------+
|Discount|lost_profit|
+--------+-----------+
|       0|        0.0|
|     0.1|  -84712.86|
|    0.15|  -45233.16|
|     0.2|  -10653.11|
|     0.3|   -2630.23|
|    0.35|  -29163.12|
|     0.4|  -46724.64|
|    0.45|   -2083.44|
|     0.5| -183735.71|
|     0.6|  -39644.07|
|    0.65|  -12219.66|
|     0.7|    -8534.1|
|     0.8|    -635.67|
|    0.85|   -4515.44|
+--------+-----------+



In [69]:
# find the discount bracket which made us not gain the most (dynamically)

lost_profit.orderBy('lost_profit').show(1)  

+--------+-----------+
|Discount|lost_profit|
+--------+-----------+
|     0.5| -183735.71|
+--------+-----------+
only showing top 1 row



In [72]:
# what would have been the total profit if we removed all orders from that discount group? 

df.filter(df['Discount'] != 0.5).select(round(sum('Profit'), 2)).withColumnRenamed('round(sum(Profit), 2)', 'totalprofit_no0.5').show()
total_profit_no05 = df.filter(df['Discount'] != 0.5).select(round(sum('Profit'), 2)).first()[0] # accessing the value
total_profit_no05

+-----------------+
|totalprofit_no0.5|
+-----------------+
|        469461.86|
+-----------------+



469461.86

In [79]:
#how much more (or less) profit is that?

total_profit1 = df.select(round(sum('Profit'))).first()[0]

total_profit1-total_profit_no05

-96631.85999999999

In [83]:
# create a temporary table for our superstore table in sql
df.createOrReplaceTempView('superstore_table')

In [84]:
# use an SQL query to count the number of rows
spark.sql("SELECT COUNT(*) FROM superstore_table").first()[0]

10000

In [85]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
spark.sql("SELECT country, ROUND(SUM(profit)/SUM(sales), 2) AS profit_ratio FROM superstore_table GROUP BY country").show()

+--------------+------------+
|       country|profit_ratio|
+--------------+------------+
|        Sweden|       -0.57|
|       Germany|        0.17|
|        France|        0.13|
|       Belgium|        0.24|
|       Finland|        0.19|
|         Italy|        0.07|
|        Norway|        0.25|
|         Spain|        0.19|
|       Denmark|        -0.5|
|       Ireland|       -0.44|
|   Switzerland|        0.29|
|      Portugal|       -0.58|
|       Austria|        0.26|
|United Kingdom|        0.21|
|   Netherlands|       -0.53|
+--------------+------------+



In [94]:
# is the country with the largest profit ratio, the country with the largest profit?
highest_ratio_country = spark.sql("SELECT country, ROUND(SUM(profit)/SUM(sales), 2) AS profit_ratio FROM superstore_table GROUP BY country ORDER BY profit_ratio DESC LIMIT 1").first()[0]
highest_ratio_country


'Switzerland'

In [95]:
highest_profit_country = spark.sql("SELECT country, SUM(profit) as total_profit from superstore_table GROUP BY country ORDER BY total_profit DESC LIMIT 1 ").first()[0]
highest_profit_country


'United Kingdom'

In [96]:
highest_ratio_country==highest_profit_country

False