In [321]:
import pyspark
from pyspark.sql import SparkSession

import pyspark.sql.functions as func
from pyspark.sql.functions import sum

In [322]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

In [323]:
spark

In [353]:
# read our data - lives in a csv file

df = spark.read.option("header","true").csv("Sample - EU Superstore.csv")
#df.show(3)
df.printSchema()

root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: string (nullable = true)



In [354]:
# how many rows of the EU Superstore dataset have the country being France
df.filter(df['Country'] == 'France').count()

2827

In [None]:
# of those, how many are profitable?

df.filter((df['Country'] == 'France') & (df['Profit'] != 0)).count()

In [355]:
# how many different discount brackets exist? what are they?
df.select(df['Discount']).distinct().count()
df.select(df['Discount']).distinct().show()

+--------+
|Discount|
+--------+
|     0.3|
|     0.7|
|       0|
|     0.2|
|    0.15|
|    0.35|
|     0.8|
|    0.45|
|     0.5|
|    0.65|
|     0.6|
|     0.1|
|    0.85|
|     0.4|
+--------+



In [None]:
# Converting the Struct type

from pyspark.sql.types import *

df.withColumn('Discount', df['Discount'].cast('double')).\
    withColumn('Sales', df['Sales'].cast('double')).\
    withColumn('Profit', df['Profit'].cast('double')).\
    withColumn('Quantity', df['Quantity'].cast('double')).printSchema()   

In [356]:
# let's see the totl profit by discount bracket, make sure they are ordered by 

prof_by_disc = df.groupBy('Discount').agg({'Profit': 'sum'}).orderBy(['Discount', 'sum(Profit)'], ascending=True)
prof_by_disc.show()

+--------+-------------------+
|Discount|        sum(Profit)|
+--------+-------------------+
|       0| 383806.53000000026|
|     0.1|  126884.0309999999|
|    0.15| 24677.563499999975|
|     0.2| 2189.5499999999984|
|     0.3| -758.4209999999999|
|    0.35|          -9122.649|
|     0.4|-21346.427999999996|
|    0.45|         -1103.1915|
|     0.5|         -96632.115|
|     0.6|-20517.456000000002|
|    0.65| -6221.965499999999|
|     0.7|          -5496.765|
|     0.8|           -460.284|
|    0.85|          -3068.658|
+--------+-------------------+



In [357]:
# what is the value after which we should stop offering discount?
prof_by_disc.collect()[4][0]

'0.3'

In [None]:
# who are the top 5 most profitable customers

prof_customers = df.groupBy('Customer Name').agg({'Profit':'sum'}).orderBy('sum(Profit)', ascending = False)
prof_customers.show(5)

In [None]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?

cust_top5 = prof_customers.collect()[:5]
df[df['Customer Name'].isin([name[0] for name in cust_top5])].select('Customer Name').count()#.show()

In [358]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)

df = df.withColumn('original', func.round(df['Sales'] /(1-df['Discount']),2))

In [362]:
# calculate the difference between sales and discount value

df.withColumn('Discount_value', func.round(df['Sales'] - (df['Sales'] * df['Discount']), 2)).\
    select('Sales', 'Discount', 'Discount_value').\
    filter(df["Discount"] > 0.0).orderBy("Sales", ascending=False).show()


+--------+--------+--------------+
|   Sales|Discount|Discount_value|
+--------+--------+--------------+
|  999.09|    0.15|        849.23|
|  993.66|     0.5|        496.83|
| 993.615|     0.5|        496.81|
| 993.531|    0.15|         844.5|
|992.9955|    0.15|        844.05|
| 990.468|     0.1|        891.42|
|  99.792|     0.1|         89.81|
|  99.792|     0.4|         59.88|
|  99.765|     0.5|         49.88|
|   99.54|     0.5|         49.77|
|  99.252|     0.1|         89.33|
|  99.252|     0.1|         89.33|
|  99.099|     0.3|         69.37|
|  989.04|     0.5|        494.52|
|  989.04|     0.5|        494.52|
| 987.957|     0.1|        889.16|
| 986.904|     0.6|        394.76|
| 985.824|     0.6|        394.33|
| 985.284|     0.1|        886.76|
|  983.88|     0.1|        885.49|
+--------+--------+--------------+
only showing top 20 rows



'df = df.withColumn("Discounted Value", func.round(df["Sales"] - (df["Sales"] * df["Discount"]), 2))\n\ndf.select("Sales", "Discount", "Discounted Value").filter(df["Discount"] > 0.0).orderBy("Sales", ascending=False).show()'

In [372]:
# how much money did we not gain due to the discounts - per discount bracket?
loss_with_discount = df.filter((df['Discount'] != 0.0) & (df['Profit'] < 0.0)).select('Discount', 'Profit')
loss_with_discount.groupBy('Discount').agg({'Profit':'sum'}).withColumnRenamed('sum(Profit)', 'loss_profit').show()

+--------+-------------------+
|Discount|        loss_profit|
+--------+-------------------+
|     0.3| -943.4910000000002|
|     0.7|          -5496.765|
|     0.2|          -2618.634|
|    0.15|          -7195.566|
|    0.35|        -11128.1355|
|     0.8|           -460.284|
|    0.45|         -1103.1915|
|     0.5|         -96632.115|
|    0.65| -6221.965499999999|
|     0.6|-20517.456000000002|
|     0.1| -8330.903999999995|
|    0.85|          -3068.658|
|     0.4| -22471.83000000001|
+--------+-------------------+



In [373]:
# find the discount bracket which made us not gain the most (dynamically)

loss_with_discount = loss_with_Disc.groupBy('Discount').agg({'Profit':'sum'}).orderBy('sum(Profit)', ascending=False).collect()[0][0]
loss_with_discount

'0.8'

In [388]:
# what would have been the total profit if we removed all orders from that discount group? 
totl_prof_without_max_loss = df.filter(df['Discount'] != loss_with_discount).select('Profit').agg({'Profit':'sum'}).withColumnRenamed('sum(Profit)', 'without_most_loss_profit').take(1)[0][0]
totl_prof_without_max_loss


373290.0254156566

In [393]:
#how much more (or less) profit is that?

total_profit = df.select('Profit').agg({'Profit':'sum'}).take(1)[0][0]
total_profit

def diff():
    return totl_prof_without_max_loss - total_profit
diff()

460.28399658203125

In [397]:
# create a temporary table for our superstore table in sql
df.createOrReplaceTempView('data')

In [398]:
# use an SQL query to count the number of rows
spark.sql('SELECT COUNT(*) FROM data').show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [403]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
spark.sql('SELECT Country, SUM(Profit) / SUM(Sales) AS profit_ratio FROM data GROUP BY Country ORDER BY profit_ratio DESC').show()


+--------------+--------------------+
|       Country|        profit_ratio|
+--------------+--------------------+
|   Switzerland| 0.29092012306058596|
|       Austria|   0.264190876869252|
|        Norway| 0.25177475561207147|
|       Belgium| 0.23508766512254156|
|United Kingdom| 0.21170103566633977|
|         Spain| 0.18941580588813087|
|       Finland| 0.18864296597998498|
|       Germany|  0.1706679208396213|
|        France| 0.12693568220672438|
|         Italy| 0.06844355267097506|
|       Ireland| -0.4442667763849829|
|       Denmark|-0.49571900283557735|
|   Netherlands| -0.5298342813704527|
|        Sweden| -0.5745674291524092|
|      Portugal| -0.5761662303775278|
+--------------+--------------------+



In [423]:
# is the country with the largest profit ratio, the country with the largest profit?
country_largest_ratio = spark.sql('SELECT Country, SUM(Profit) / SUM(Sales) AS profit_ratio FROM data GROUP BY Country ORDER BY profit_ratio DESC LIMIT 1;').collect()[0][0]
country_largest_profit = spark.sql('SELECT Country, SUM(Profit) AS Profit FROM data GROUP BY Country ORDER BY Profit DESC LIMIT 1;').collect()[0][0]

print(f'The largest profit ratio country is different from the largest profit country and the both are: {country_largest_ratio}, {country_largest_profit} respectively.')

The largest profit ratio country is different from the largest profit country and the both are: Switzerland, United Kingdom respectively.
