In [4]:
import pyspark
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.master('local[*]').appName('Final').getOrCreate()

In [6]:
spark

In [12]:
df.printSchema()

root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: string (nullable = true)



In [7]:
#Read our data
df = spark.read.option('header','true').csv('Sample - EU Superstore.csv')

In [8]:
# how many rows of the EU Superstore dataset have the country being France
df.filter(df['Country'] == 'France').count()

2827

In [11]:
# of those, how many are profitable?
df.filter((df['Country'] == 'France') & (df['Profit'] > 0)).count()

2277

In [13]:
# how any different discount brackets exist? what are they?
df.select('Discount').distinct().show()

+--------+
|Discount|
+--------+
|     0.3|
|     0.7|
|       0|
|     0.2|
|    0.15|
|    0.35|
|     0.8|
|    0.45|
|     0.5|
|    0.65|
|     0.6|
|     0.1|
|    0.85|
|     0.4|
+--------+



In [17]:
# let's see the totl profit by discount bracket, make sure they are ordered by 
profit_by_discount = df.groupBy('Discount').agg({'Profit': 'sum'}).orderBy('sum(Profit)', ascending = False)

profit_by_discount.show()


+--------+-------------------+
|Discount|        sum(Profit)|
+--------+-------------------+
|       0| 383806.53000000026|
|     0.1|  126884.0309999999|
|    0.15| 24677.563499999975|
|     0.2| 2189.5499999999984|
|     0.8|           -460.284|
|     0.3| -758.4209999999999|
|    0.45|         -1103.1915|
|    0.85|          -3068.658|
|     0.7|          -5496.765|
|    0.65| -6221.965499999999|
|    0.35|          -9122.649|
|     0.6|-20517.456000000002|
|     0.4|-21346.427999999996|
|     0.5|         -96632.115|
+--------+-------------------+



In [21]:
# what is the value after which we should stop offering discount?
profit_by_discount.filter(profit_by_discount["sum(Profit)"] > 0).orderBy("sum(Profit)", ascending=True).select("Discount").collect()[0][0]

'0.2'

In [23]:
df.printSchema()

root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: string (nullable = true)



In [27]:
# who are the top 5 most profitable customers

top_5 = df.groupBy('Customer ID').agg({'Profit':'mean'}).orderBy('avg(Profit)', ascending = False).limit(5)

In [39]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?

top_5_cust = top_5.select('Customer ID')
df[df['Customer ID'].isin(top_5_cust['Customer ID'])].collect()

In [31]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)
df = df.withColumn('Original', df['Sales'] / (1-df['Discount']))

In [32]:
# how much money did we not gain due to the discounts - per discount bracket?
df = df.withColumn('Lose', df['Original'] -df['Sales'])
df.groupBy('Discount').agg({'Lose': 'sum'}).show()

+--------+------------------+
|Discount|         sum(Lose)|
+--------+------------------+
|     0.3|2630.2410000000004|
|     0.7|          8534.085|
|       0|               0.0|
|     0.2|10653.119999999997|
|    0.15| 45233.17650000002|
|    0.35|29163.099000000002|
|     0.8| 635.6640000000002|
|    0.45|2083.4414999999995|
|     0.5|183734.26500000045|
|    0.65|12219.655499999999|
|     0.6| 39644.04599999997|
|     0.1| 84712.44899999995|
|    0.85|          4515.438|
|     0.4| 46724.68800000002|
+--------+------------------+



In [33]:
# find the discount bracket which made us not gain the most (dynamically)
df.groupBy('Discount').agg({'Lose': 'sum'}).orderBy('sum(Lose)', ascending = False).limit(1).show()

+--------+------------------+
|Discount|         sum(Lose)|
+--------+------------------+
|     0.5|183734.26500000045|
+--------+------------------+



In [34]:
#how much more (or less) profit is that?

#We would not lose about 183k $ if we removed all orders from 0.5 discount group.

In [35]:
# create a temporary table for our superstore table in sql
df.createOrReplaceTempView('df')

In [36]:
# use an SQL query to count the number of rows
spark.sql('SELECT Count(*) FROM df').show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [37]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
spark.sql('SELECT Country ,sum(Profit) / sum(Sales) FROM df GROUP BY Country ORDER BY 2 DESC').show()

+--------------+--------------------------+
|       Country|(sum(Profit) / sum(Sales))|
+--------------+--------------------------+
|   Switzerland|        0.2909201193350232|
|       Austria|        0.2641908775042505|
|        Norway|        0.2517747548521659|
|       Belgium|       0.23508766583987942|
|United Kingdom|       0.21170103540397134|
|         Spain|       0.18941580658358978|
|       Finland|       0.18864296633316185|
|       Germany|       0.17066792076621765|
|        France|       0.12693568221933804|
|         Italy|       0.06844355185424991|
|       Ireland|      -0.44426677493909256|
|       Denmark|       -0.4957190005664471|
|   Netherlands|       -0.5298342790541865|
|        Sweden|       -0.5745674280714466|
|      Portugal|       -0.5761662270806188|
+--------------+--------------------------+



In [38]:
# is the country with the largest profit ratio, the country with the largest profit?

spark.sql('SELECT Country, MAX(Profit) FROM df GROUP BY Country ORDER BY MAX(Profit) DESC').show()

# According to results it is not correct.

+--------------+-----------+
|       Country|max(Profit)|
+--------------+-----------+
|       Germany|     99.696|
|        France|     99.672|
|         Spain|     99.441|
|United Kingdom|      99.24|
|         Italy|      99.24|
|       Austria|         99|
|   Switzerland|      952.2|
|       Finland|      94.92|
|       Belgium|      91.68|
|        Norway|         90|
|   Netherlands|      9.048|
|       Denmark|          0|
|      Portugal|          0|
|        Sweden|          0|
|       Ireland|     -92.52|
+--------------+-----------+

