In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

23/02/13 12:37:27 WARN Utils: Your hostname, murats-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.68.101 instead (on interface en0)
23/02/13 12:37:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/13 12:37:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [6]:
# read our data - lives in a csv file

df = spark.read.option("header","true").csv("Sample - EU Superstore.csv")

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [8]:
df.printSchema()

root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: string (nullable = true)



In [76]:
df.select("Customer ID").distinct().count()

795

In [7]:
# how many rows of the EU Superstore dataset have the country being France
df.select("Country").distinct().count()

                                                                                

15

In [24]:
# of those, how many are profitable?
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType

df = df.withColumn("Profit", df.Profit.cast(IntegerType()))

byProfit = df.groupBy("Country").sum("Profit")
profitable = byProfit.filter(byProfit["sum(Profit)"] > 0)
profitable.show()
profitable.count()

+--------------+-----------+
|       Country|sum(Profit)|
+--------------+-----------+
|       Germany|     106518|
|        France|     108070|
|       Belgium|      11494|
|       Finland|       3873|
|         Italy|      19558|
|        Norway|       5133|
|         Spain|      54054|
|   Switzerland|       7201|
|       Austria|      21307|
|United Kingdom|     111323|
+--------------+-----------+



10

In [28]:
# how any different discount brackets exist? what are they?
brackets = df.select("Discount").distinct().orderBy("Discount")
brackets.show()
brackets.count()

+--------+
|Discount|
+--------+
|       0|
|     0.1|
|    0.15|
|     0.2|
|     0.3|
|    0.35|
|     0.4|
|    0.45|
|     0.5|
|     0.6|
|    0.65|
|     0.7|
|     0.8|
|    0.85|
+--------+



14

In [42]:
# let's see the totl profit by discount bracket, make sure they are ordered by 
discount_totProfit = df.groupBy("Discount").agg(sum("Profit").alias("total_profit")).orderBy("Discount")
discount_totProfit.show()

+--------+------------+
|Discount|total_profit|
+--------+------------+
|       0|      380803|
|     0.1|      126392|
|    0.15|       24623|
|     0.2|        2174|
|     0.3|        -756|
|    0.35|       -9108|
|     0.4|      -21259|
|    0.45|       -1102|
|     0.5|      -96104|
|     0.6|      -20460|
|    0.65|       -6213|
|     0.7|       -5493|
|     0.8|        -459|
|    0.85|       -3068|
+--------+------------+



In [51]:
# what is the value after which we should stop offering discount?
from pyspark.sql.types import FloatType

df = df.withColumn("Discount", df.Discount.cast(FloatType()))


discount_totProfit.filter(discount_totProfit.total_profit >= 0).select(max("Discount")).show()

+-------------+
|max(Discount)|
+-------------+
|          0.2|
+-------------+



In [77]:
# who are the top 5 most profitable customers

topCustomers = df.orderBy("Profit", ascending=False).select("Customer Name", "Customer ID")
topCustomers.show(5)

+-----------------+-----------+
|    Customer Name|Customer ID|
+-----------------+-----------+
|    Patrick Jones|   PJ-18835|
|Elpida Rittenbach|   ER-13855|
|  Mike Gockenbach|   MG-18145|
|     James Galang|   JG-15160|
|    Ellis Ballard|   EB-13840|
+-----------------+-----------+



In [79]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method 

top5CustIDs = [i[0] for i in topCustomers.limit(5).select("Customer ID").collect()]
topCustoemrsDF = df.filter(df["Customer ID"].isin(top5CustIDs))

# - how many rows are they?

topCustoemrsDF.count()

54

In [82]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)

df.withColumn("without Discount", df["Sales"]/(1 - df["Discount"])).show(5)

+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+----------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID| Customer Name|  Segment| City|  State|       Country|Region|     Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|without Discount|
+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+----------------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|Aaron Smayling|Corporate|Leeds|England|United Kingdom| North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|  79.2|       3|     0.0|    39|            79.2|


In [83]:
# calculate the difference between sales and discount value

df.withColumn("differance", df["Sales"]- (df["Discount"]*df["Sales"])).show(5)

+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+----------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID| Customer Name|  Segment| City|  State|       Country|Region|     Product ID|       Category|Sub-Category|        Product Name| Sales|Quantity|Discount|Profit|differance|
+------+---------------+----------+----------+--------------+-----------+--------------+---------+-----+-------+--------------+------+---------------+---------------+------------+--------------------+------+--------+--------+------+----------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|Aaron Smayling|Corporate|Leeds|England|United Kingdom| North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes Folders,...|  79.2|       3|     0.0|    39|      79.2|
|     2|ES-2017-1311038|

In [92]:
# how much money did we not gain due to the discounts - per discount bracket?

discount_totSales = df.groupBy("Discount").agg(round(sum("Sales")).alias("total_sales")).orderBy("Discount")
discount_totSales = discount_totSales.withColumn("discount_amount", 
                                round(discount_totSales["total_sales"]*discount_totSales["Discount"]))
discount_totSales.show()

+--------+-----------+---------------+
|Discount|total_sales|discount_amount|
+--------+-----------+---------------+
|     0.0|  1522456.0|            0.0|
|     0.1|   762412.0|        76241.0|
|    0.15|   256321.0|        38448.0|
|     0.2|    42612.0|         8522.0|
|     0.3|     6137.0|         1841.0|
|    0.35|    54160.0|        18956.0|
|     0.4|    70087.0|        28035.0|
|    0.45|     2546.0|         1146.0|
|     0.5|   183734.0|        91867.0|
|     0.6|    26429.0|        15857.0|
|    0.65|     6580.0|         4277.0|
|     0.7|     3657.0|         2560.0|
|     0.8|      159.0|          127.0|
|    0.85|      797.0|          677.0|
+--------+-----------+---------------+



In [108]:
# find the discount bracket which made us not gain the most (dynamically)

v = discount_totSales.agg(max("discount_amount")).collect()
discount_totSales.filter(discount_totSales["discount_amount"] == v[0][0]).show()

+--------+-----------+---------------+
|Discount|total_sales|discount_amount|
+--------+-----------+---------------+
|     0.5|   183734.0|        91867.0|
+--------+-----------+---------------+



In [115]:
# what would have been the total profit if we removed all orders from that discount group? 

before = df.agg(sum("Profit"))
after = df.filter(df['Discount'] != 0.5).agg(sum("Profit"))

before.show()
after.show()

+-----------+
|sum(Profit)|
+-----------+
|     369970|
+-----------+

+-----------+
|sum(Profit)|
+-----------+
|     466074|
+-----------+



In [117]:
#how much more (or less) profit is that?

after.collect()[0][0] - before.collect()[0][0]

96104

In [118]:
# create a temporary table for our superstore table in sql

df.createOrReplaceTempView("df")

In [120]:
# use an SQL query to count the number of rows
spark.sql("SELECT COUNT(*) FROM df").show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [139]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)

profRatDF = spark.sql("SELECT Country, round(sum(Profit) / sum(Sales), 2) AS profit_ratio FROM df GROUP BY Country ORDER BY 2 DESC")
profRatDF.show()

+--------------+------------+
|       Country|profit_ratio|
+--------------+------------+
|   Switzerland|        0.29|
|       Austria|        0.26|
|        Norway|        0.25|
|       Belgium|        0.23|
|United Kingdom|        0.21|
|       Finland|        0.19|
|         Spain|        0.19|
|       Germany|        0.17|
|        France|        0.13|
|         Italy|        0.07|
|       Ireland|       -0.44|
|       Denmark|       -0.49|
|   Netherlands|       -0.53|
|        Sweden|       -0.57|
|      Portugal|       -0.57|
+--------------+------------+



In [141]:
# is the country with the largest profit ratio, the country with the largest profit?

topProfDF = spark.sql("SELECT Country, sum(Profit) AS profit_ratio FROM df GROUP BY Country ORDER BY 2 DESC")

print(topProfDF.collect()[0][0])
print(profRatDF.collect()[0][0])

United Kingdom
Switzerland
