In [0]:
# this is used the first time I uploaded data
# read our data - lives in a csv file

#df = spark.read.table('sample___eu_superstore_csv')
#df.show()

In [0]:
# for later usage of this notebook, data will be retrieved this way
df = spark.read.format('csv').option("header","true").option("inferSchema","true").load("/FileStore/tables/Sample___EU_Superstore.csv")

In [0]:
# cleaning column names to be able to save file as table
new_column_name = list(map(lambda x: x.replace(' ', '_'), df.columns))
df = df.toDF(*new_column_name)

In [0]:
df = df.withColumnRenamed('Sub-Category', 'Sub_Category')

In [0]:
# to be able to use pure sql on our data, it needs to be written as a table
df.write.format('delta').mode('overwrite').saveAsTable('Sample___EU_Superstore_csv')

In [0]:
import pyspark.sql.functions as F

In [0]:
# changing date column to correct type.
df = df.withColumn('Order_Date', F.to_date('Order_date', 'dd/MM/yyyy')).\
        withColumn('Ship_Date', F.to_date('Ship_Date', 'dd/MM/yyyy'))

From here on are platform exercises

In [0]:
# how many rows of the EU Superstore dataset have the country being France
df.filter(df['Country'] == 'France').count()

Out[8]: 2827

In [0]:
# of those, how many are profitable?
df.filter((df['Country'] == 'France') & (df['Profit'] > 0)).count()


Out[9]: 2329

In [0]:
# how many different discount brackets exist? what are they?
df.select("Discount").distinct().count()
# there are 14 different values for Discount

Out[10]: 14

In [0]:
df.select("Discount").distinct().show()
# here are those distinct values

+--------+
|Discount|
+--------+
|     0.0|
|     0.2|
|     0.7|
|     0.1|
|    0.45|
|     0.6|
|     0.8|
|    0.35|
|     0.5|
|     0.4|
|    0.85|
|    0.15|
|    0.65|
|     0.3|
+--------+



In [0]:
# let's see the total profit by discount bracket, make sure they are ordered by 
df.groupBy('Discount').agg(F.sum('Profit').alias('Total profit')).orderBy('Total profit', ascending = False).show()

+--------+-------------------+
|Discount|       Total profit|
+--------+-------------------+
|     0.0| 383806.53000000026|
|     0.1|  126884.0309999999|
|    0.15| 24677.563499999975|
|     0.2| 2189.5499999999984|
|     0.8|           -460.284|
|     0.3| -758.4209999999999|
|    0.45|         -1103.1915|
|    0.85|          -3068.658|
|     0.7|          -5496.765|
|    0.65| -6221.965499999999|
|    0.35|          -9122.649|
|     0.6|-20517.456000000002|
|     0.4|-21346.427999999996|
|     0.5|         -96632.115|
+--------+-------------------+



In [0]:
# what is the value after which we should stop offering discount?
profit_df = df.groupBy('Discount').agg(F.sum('Profit').alias('Total profit')).orderBy('Total profit', ascending = False)
profitable_df = profit_df.filter(profit_df['Total profit'] >= 0)
discount_stop = profitable_df.collect()[-1][0]
print(f'We should not go above {discount_stop} Discount or {discount_stop*100}%')

We should not go above 0.2 Discount or 20.0%


In [0]:
# who are the top 5 most profitable customers
top_5_customers = df.groupBy('Customer_ID').agg(F.sum('Profit').alias('Profit_per_customer'), F.first('Customer_Name').alias('Customer_name')).orderBy('Profit_per_customer', ascending = False)
top_5_customers.show(5)

+-----------+-------------------+-----------------+
|Customer_ID|Profit_per_customer|    Customer_name|
+-----------+-------------------+-----------------+
|   SP-20920|  4974.512999999999|     Susan Pistek|
|   PJ-18835| 3986.0039999999995|    Patrick Jones|
|   PO-18865|           3778.197|Patrick O'Donnell|
|   EB-13840|            3459.66|    Ellis Ballard|
|   MG-18145| 3144.4439999999995|  Mike Gockenbach|
+-----------+-------------------+-----------------+
only showing top 5 rows



In [0]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?
rows = top_5_customers.collect()[:5]
name_list = []
for row in rows:
    name = row[2]
    name_list.append(name)
name_list

Out[15]: ['Susan Pistek',
 'Patrick Jones',
 "Patrick O'Donnell",
 'Ellis Ballard',
 'Mike Gockenbach']

In [0]:
df.filter(df['Customer_Name'].isin(name_list)).show() #all rows belonging to 5 customer names.

+------+---------------+----------+----------+--------------+-----------+-----------------+---------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+-------+--------+--------+-------+
|Row_ID|       Order_ID|Order_Date| Ship_Date|     Ship_Mode|Customer_ID|    Customer_Name|  Segment|         City|               State|       Country| Region|     Product_ID|       Category|Sub_Category|        Product_Name|  Sales|Quantity|Discount| Profit|
+------+---------------+----------+----------+--------------+-----------+-----------------+---------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+-------+--------+--------+-------+
|     8|ES-2015-5113958|2015-08-02|2015-08-07|  Second Class|   EB-13840|    Ellis Ballard|Corporate|West Bromwich|             England|United Kingdom|  North|TEC-CO-10004325|     Technology|     Copiers|Canon Personal C

In [0]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)
df = df.withColumn('Sales_no_discount', df['Sales']/(1-df['Discount']))

In [0]:
# calculate the difference between sales and discount value
df = df.withColumn('sales_diff', F.col('Sales_no_discount') - F.col('Sales'))

In [0]:
# how much money did we not gain due to the discounts - per discount bracket?
missed_profit = df.groupBy('Discount').agg(F.sum('sales_diff').alias('Missed_profit'))
missed_profit.show()

+--------+------------------+
|Discount|     Missed_profit|
+--------+------------------+
|     0.0|               0.0|
|     0.2|10653.119999999997|
|     0.7|          8534.085|
|     0.1| 84712.44899999995|
|    0.45|2083.4414999999995|
|     0.6| 39644.04599999997|
|     0.8| 635.6640000000002|
|    0.35|29163.099000000002|
|     0.5|183734.26500000045|
|     0.4| 46724.68800000002|
|    0.85|          4515.438|
|    0.15| 45233.17650000002|
|    0.65|12219.655499999999|
|     0.3|2630.2410000000004|
+--------+------------------+



In [0]:
# find the discount bracket which made us not gain the most (dynamically)
biggest_miss = missed_profit.orderBy('Missed_profit', ascending = False).collect()[0][0]
print(f'the biggest profit miss is caused by bracked of {biggest_miss} or discount of {biggest_miss*100}%')

the biggest profit miss is caused by bracked of 0.5 or discount of 50.0%


In [0]:
# what would have been the total profit if we removed all orders from that discount group? 
df.filter(df['Discount'] != biggest_miss).agg(F.sum('Profit')).show()

+-----------------+
|      sum(Profit)|
+-----------------+
|469461.8565000003|
+-----------------+



In [0]:
df.agg(F.sum('Profit')).collect()[0][0] - df.filter(df['Discount'] == biggest_miss).agg(F.sum('Profit')).collect()[0][0] #just a test to see if previous aggregation was correct

Out[36]: 469461.8565000005

In [0]:
#how much more (or less) profit is that?
x = df.agg(F.sum('Profit')).collect()[0][0] - df.filter(df['Discount'] != biggest_miss).agg(F.sum('Profit')).collect()[0][0]
if x < 0:
    print(f'if we remove {biggest_miss} discount the total profit would be up by ${-x}')
else:
    print(f'if we remove {biggest_miss} discount the total profit would be down by ${x}')


if we remove 0.5 discount the total profit would be up by $96632.11499999976


In [0]:
# create a temporary table for our superstore table in sql
df.createOrReplaceTempView('df_sql')

In [0]:
# use an SQL query to count the number of rows
spark.sql(
    'select count(*) from df_sql'
).show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [0]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
spark.sql(
    '''
    select Country, sum(profit)/sum(sales) as Profit_ratio from df_sql
    group by Country
    '''
).show()

+--------------+--------------------+
|       Country|        Profit_ratio|
+--------------+--------------------+
|        Sweden| -0.5745674280714466|
|       Germany| 0.17066792076621765|
|        France| 0.12693568221933804|
|       Belgium| 0.23508766583987942|
|       Finland| 0.18864296633316185|
|         Italy| 0.06844355185424991|
|        Norway|  0.2517747548521659|
|         Spain| 0.18941580658358978|
|       Denmark| -0.4957190005664471|
|       Ireland|-0.44426677493909256|
|   Switzerland|  0.2909201193350232|
|      Portugal| -0.5761662270806188|
|       Austria|  0.2641908775042505|
|United Kingdom| 0.21170103540397134|
|   Netherlands| -0.5298342790541865|
+--------------+--------------------+



In [0]:
# is the country with the largest profit ratio, the country with the largest profit?
best_ratio = spark.sql(
    '''
    select Country, sum(profit)/sum(sales) as Profit_ratio from df_sql
    group by Country
    order by Profit_ratio desc
    limit 1
    '''
).collect()[0][0]

biggest_profit = spark.sql(
    '''
    select country, sum(Profit) as total_profit from df_sql
    group by Country
    order by total_profit desc
    limit 1
    '''
).collect()[0][0]
if biggest_profit == best_ratio:
    print(f'{biggest_profit} is the Country with both biggest profit and the best ratio')
else:
    print(f'{biggest_profit} has the biggest profit but {best_ratio} has the best profit ratio')

United Kingdom has the biggest profit but Switzerland has the best profit ratio
