In [0]:
import pyspark
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

In [0]:
spark

In [0]:
# read our data - lives in a csv file
df = spark.read.table("sample___eu_superstore_6_csv")
# df = spark.read.option("header","true").csv("/Users/pedrocarneiro/peter/develhop/SPARK/Sample - EU Superstore.csv")

In [0]:
# how many rows of the EU Superstore dataset have the country being France
df.filter(df['Country'] == 'France').count()


Out[6]: 2827

In [0]:
# of those, how many are profitable?

df.filter((df['Profit'] > 0) & (df['Country'] == 'France')).count()

Out[7]: 2329

In [0]:
df.printSchema()

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: double (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Profit: double (nullable = true)



In [0]:
# how many different discount brackets exist? what are they?
df.select('Discount').distinct().count()

Out[9]: 14

In [0]:
df.select('Discount').distinct().orderBy('Discount').show()

+--------+
|Discount|
+--------+
|     0.0|
|     0.1|
|    0.15|
|     0.2|
|     0.3|
|    0.35|
|     0.4|
|    0.45|
|     0.5|
|     0.6|
|    0.65|
|     0.7|
|     0.8|
|    0.85|
+--------+



In [0]:
# let's see the total profit by discount bracket, make sure they are ordered by 
discount_profit_df = df.groupBy('Discount').agg({'profit': 'sum'}).orderBy('sum(profit)')
discount_profit_df.show()


+--------+-------------------+
|Discount|        sum(profit)|
+--------+-------------------+
|     0.5|         -96632.115|
|     0.4|-21346.427999999996|
|     0.6|-20517.456000000002|
|    0.35|          -9122.649|
|    0.65| -6221.965499999999|
|     0.7|          -5496.765|
|    0.85|          -3068.658|
|    0.45|         -1103.1915|
|     0.3| -758.4209999999999|
|     0.8|           -460.284|
|     0.2| 2189.5499999999984|
|    0.15| 24677.563499999975|
|     0.1|  126884.0309999999|
|     0.0| 383806.53000000026|
+--------+-------------------+



In [0]:
# what is the value after which we should stop offering discount?

top_discount = discount_profit_df.filter(discount_profit_df['sum(profit)'] > 0).collect()[0][0]
print(f'We shoudl stop offering discount after {top_discount}')


We shoudl stop offering discount after 0.2


In [0]:
# who are the top 5 most profitable customers

cust_profit_df = df.groupBy('Customer Name').agg({'profit': 'sum'}).orderBy('sum(profit)', ascending = False)
cust_profit_df.show(5)


+-----------------+------------------+
|    Customer Name|       sum(profit)|
+-----------------+------------------+
|     Susan Pistek| 4974.512999999999|
|    Patrick Jones|3986.0039999999995|
|Patrick O'Donnell|          3778.197|
|    Ellis Ballard|           3459.66|
|  Mike Gockenbach|3144.4439999999995|
+-----------------+------------------+
only showing top 5 rows



In [0]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?

top5_cust = [i[0] for i in cust_profit_df.collect()[:5]]
df.filter(df['Customer Name'].isin(top5_cust)).show()

+------+---------------+----------+----------+--------------+-----------+-----------------+---------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+-------+--------+--------+-------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|    Customer Name|  Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|  Sales|Quantity|Discount| Profit|
+------+---------------+----------+----------+--------------+-----------+-----------------+---------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+-------+--------+--------+-------+
|     8|ES-2015-5113958|02/08/2015|07/08/2015|  Second Class|   EB-13840|    Ellis Ballard|Corporate|West Bromwich|             England|United Kingdom|  North|TEC-CO-10004325|     Technology|     Copiers|Canon Personal C

In [0]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)

df = df.withColumn('orginal_sales', df['Sales']/(1 - df['discount']))
df.select('orginal_sales').show()

+------------------+
|     orginal_sales|
+------------------+
|              79.2|
|            388.92|
|             35.19|
|             50.94|
|            307.44|
|             122.4|
|            413.82|
|            428.22|
|           3979.29|
|             43.56|
|             25.26|
|2715.4500000000003|
|             12.21|
|2549.7599999999998|
|            153.45|
|142.64999999999998|
|            690.12|
|              8.16|
|            347.88|
| 639.4499999999999|
+------------------+
only showing top 20 rows



In [0]:
# calculate the difference between sales and discount value

df.withColumn('diff', (1 - df['discount']) * df['orginal_sales']).select('diff').show()

+--------+
|    diff|
+--------+
|    79.2|
|  388.92|
|   35.19|
|   50.94|
|  307.44|
|   122.4|
|  413.82|
|  428.22|
| 3979.29|
|   43.56|
|   25.26|
|2443.905|
|   12.21|
|2167.296|
| 138.105|
| 128.385|
|  690.12|
|    8.16|
|  347.88|
| 575.505|
+--------+
only showing top 20 rows



In [0]:
# how much money did we not gain due to the discounts - per discount bracket?
bracket_total_discount = df.withColumn('total_discount', df['discount']  * df['orginal_sales']).groupBy('discount').agg({'total_discount': 'sum'}).orderBy('sum(total_discount)', ascending = False)
bracket_total_discount.show()

+--------+-------------------+
|discount|sum(total_discount)|
+--------+-------------------+
|     0.5| 183734.26500000045|
|     0.1|  84712.44899999996|
|     0.4|  46724.68800000001|
|    0.15|  45233.17650000002|
|     0.6|  39644.04599999997|
|    0.35| 29163.099000000002|
|    0.65|         12219.6555|
|     0.2| 10653.119999999999|
|     0.7|  8534.084999999997|
|    0.85|           4515.438|
|     0.3|           2630.241|
|    0.45|          2083.4415|
|     0.8|  635.6640000000002|
|     0.0|                0.0|
+--------+-------------------+



In [0]:
# find the discount bracket which made us not gain the most (dynamically)

bracket_total_discount.collect()[0][0]

Out[31]: 0.5

In [0]:
# what would have been the total profit if we removed all orders from that discount group? 
from pyspark.sql.functions import sum

sum_result = df.filter(df['discount'] != 0.5).agg(sum('profit')).collect()[0][0]
all_result = df.agg(sum('profit')).collect()[0][0]

print(sum_result)



+-----------------+
|      sum(profit)|
+-----------------+
|469461.8565000003|
+-----------------+



In [0]:
#how much more (or less) profit is that?

print(f'{sum_result - all_result} more')

96632.11499999976 more


In [0]:
# create a temporary table for our superstore table in sql
df.createOrReplaceTempView('superstore')

In [0]:
# use an SQL query to count the number of rows
spark.sql('select count(*) num_rows from superstore').show()

+--------+
|num_rows|
+--------+
|   10000|
+--------+



In [0]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)

spark.sql('select country, sum(Profit)/sum(Sales) profit_ratio from superstore group by 1 order by 2 desc').show()


+--------------+--------------------+
|       country|        profit_ratio|
+--------------+--------------------+
|   Switzerland|  0.2909201193350232|
|       Austria|  0.2641908775042505|
|        Norway|  0.2517747548521659|
|       Belgium| 0.23508766583987942|
|United Kingdom| 0.21170103540397134|
|         Spain| 0.18941580658358978|
|       Finland| 0.18864296633316185|
|       Germany| 0.17066792076621765|
|        France| 0.12693568221933804|
|         Italy| 0.06844355185424991|
|       Ireland|-0.44426677493909256|
|       Denmark| -0.4957190005664471|
|   Netherlands| -0.5298342790541865|
|        Sweden| -0.5745674280714466|
|      Portugal| -0.5761662270806188|
+--------------+--------------------+



In [0]:
# is the country with the largest profit ratio, the country with the largest profit?

spark.sql('select country, sum(Profit) from superstore group by 1 order by 2 desc').show()
print('No')

+--------------+-------------------+
|       country|        sum(Profit)|
+--------------+-------------------+
|United Kingdom| 111900.15000000001|
|        France| 109029.00299999975|
|       Germany| 107322.82049999991|
|         Spain|  54390.11999999999|
|       Austria|           21442.26|
|         Italy| 19828.757999999965|
|       Belgium|           11572.59|
|   Switzerland|  7237.470000000001|
|        Norway|            5167.77|
|       Finland|            3905.73|
|       Denmark|-4282.0470000000005|
|       Ireland| -7392.381000000003|
|      Portugal| -8703.059999999998|
|        Sweden|-17519.366999999987|
|   Netherlands| -41070.07499999996|
+--------------+-------------------+

No
