In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql.functions import sum,avg,max,count

In [3]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

In [4]:
spark

In [5]:
# read our data - lives in a csv file

df = spark.read.option("header","true").csv("\\Users\\Mohammad Dayyat\\pyspark\\Sample - EU Superstore.csv")

In [6]:
# how many rows of the EU Superstore dataset have the country being France
data = df.filter(df["Country"] == "France")
data.count()

2827

In [7]:
# of those, how many are profitable?
data.filter(df["Profit"] > 0).count()

2277

In [8]:
# how any different discount brackets exist? what are they?
dis_set = df.select(["Discount"]).distinct()
dis_set.count()
dis_set.show()

+--------+
|Discount|
+--------+
|     0.3|
|     0.7|
|       0|
|     0.2|
|    0.15|
|    0.35|
|     0.8|
|    0.45|
|     0.5|
|    0.65|
|     0.6|
|     0.1|
|    0.85|
|     0.4|
+--------+



In [9]:
# let's see the totl profit by discount bracket, make sure they are ordered by 
df.groupBy("Discount").agg({'Profit': 'sum'}).orderBy('sum(Profit)').show()

+--------+-------------------+
|Discount|        sum(Profit)|
+--------+-------------------+
|     0.5|         -96632.115|
|     0.4|-21346.427999999996|
|     0.6|-20517.456000000002|
|    0.35|          -9122.649|
|    0.65| -6221.965499999999|
|     0.7|          -5496.765|
|    0.85|          -3068.658|
|    0.45|         -1103.1915|
|     0.3| -758.4209999999999|
|     0.8|           -460.284|
|     0.2| 2189.5499999999984|
|    0.15| 24677.563499999975|
|     0.1|  126884.0309999999|
|       0| 383806.53000000026|
+--------+-------------------+



In [10]:
# what is the value after which we should stop offering discount?

## The value of discount is (0.2) after that the profit will be less than zero

In [11]:
# who are the top 5 most profitable customers
cust_profit = df.groupBy(["Customer ID","Customer Name"]).agg({'Profit': 'sum'}).orderBy('Sum(Profit)',ascending = False)
cust_profit.show(5)


+-----------+-----------------+------------------+
|Customer ID|    Customer Name|       sum(Profit)|
+-----------+-----------------+------------------+
|   SP-20920|     Susan Pistek| 4974.512999999999|
|   PJ-18835|    Patrick Jones|3986.0039999999995|
|   PO-18865|Patrick O'Donnell|          3778.197|
|   EB-13840|    Ellis Ballard|           3459.66|
|   MG-18145|  Mike Gockenbach|3144.4439999999995|
+-----------+-----------------+------------------+
only showing top 5 rows



In [14]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?
cust_profit.collect()


[Row(Customer ID='SP-20920', Customer Name='Susan Pistek', sum(Profit)=4974.512999999999),
 Row(Customer ID='PJ-18835', Customer Name='Patrick Jones', sum(Profit)=3986.0039999999995),
 Row(Customer ID='PO-18865', Customer Name="Patrick O'Donnell", sum(Profit)=3778.197),
 Row(Customer ID='EB-13840', Customer Name='Ellis Ballard', sum(Profit)=3459.66),
 Row(Customer ID='MG-18145', Customer Name='Mike Gockenbach', sum(Profit)=3144.4439999999995),
 Row(Customer ID='ER-13855', Customer Name='Elpida Rittenbach', sum(Profit)=2899.806),
 Row(Customer ID='LB-16795', Customer Name='Laurel Beltran', sum(Profit)=2892.7170000000006),
 Row(Customer ID='RB-19330', Customer Name='Randy Bradley', sum(Profit)=2864.601),
 Row(Customer ID='JH-15820', Customer Name='John Huston', sum(Profit)=2860.1024999999995),
 Row(Customer ID='MS-17770', Customer Name='Maxwell Schwartz', sum(Profit)=2795.88),
 Row(Customer ID='BV-11245', Customer Name='Benjamin Venier', sum(Profit)=2761.2719999999995),
 Row(Customer ID=

In [15]:
cust_profit.count()


795

In [None]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)

In [16]:
df = df.withColumn("Orginal Value of Sale",df['Sales']/(1-df['Discount']))
df.show()

+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+---------------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|Orginal Value of Sale|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+---------------------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corporate|        Leeds|             England|United King

In [19]:
# calculate the difference between sales and discount value
df = df.withColumn("Amount of discount",df['Orginal Value of Sale'] - df['Sales'])
df.select("Sales","Amount of discount").show()

+--------+------------------+
|   Sales|Amount of discount|
+--------+------------------+
|    79.2|               0.0|
|  388.92|               0.0|
|   35.19|               0.0|
|   50.94|               0.0|
|  307.44|               0.0|
|   122.4|               0.0|
|  413.82|               0.0|
|  428.22|               0.0|
| 3979.29|               0.0|
|   43.56|               0.0|
|   25.26|               0.0|
|2443.905| 271.5450000000001|
|   12.21|               0.0|
|2167.296|382.46399999999994|
| 138.105|15.344999999999999|
| 128.385|14.264999999999986|
|  690.12|               0.0|
|    8.16|               0.0|
|  347.88|               0.0|
| 575.505|63.944999999999936|
+--------+------------------+
only showing top 20 rows



In [20]:
# how much money did we not gain due to the discounts - per discount bracket?
count_of_gain = df.groupBy("Discount").agg({'Amount of discount': 'sum'})
count_of_gain.show()

+--------+-----------------------+
|Discount|sum(Amount of discount)|
+--------+-----------------------+
|     0.3|     2630.2410000000004|
|     0.7|               8534.085|
|       0|                    0.0|
|     0.2|     10653.119999999997|
|    0.15|      45233.17650000002|
|    0.35|     29163.099000000002|
|     0.8|      635.6640000000002|
|    0.45|     2083.4414999999995|
|     0.5|     183734.26500000045|
|    0.65|     12219.655499999999|
|     0.6|      39644.04599999997|
|     0.1|      84712.44899999995|
|    0.85|               4515.438|
|     0.4|      46724.68800000002|
+--------+-----------------------+



In [21]:
# find the discount bracket which made us not gain the most (dynamically)
# (0.5) discount rate 
count_of_gain = df.groupBy("Discount").agg({'Amount of discount': 'sum'}).orderBy("sum(Amount of discount)",ascending =False)
count_of_gain.show()

+--------+-----------------------+
|Discount|sum(Amount of discount)|
+--------+-----------------------+
|     0.5|     183734.26500000045|
|     0.1|      84712.44899999995|
|     0.4|      46724.68800000002|
|    0.15|      45233.17650000002|
|     0.6|      39644.04599999997|
|    0.35|     29163.099000000002|
|    0.65|     12219.655499999999|
|     0.2|     10653.119999999997|
|     0.7|               8534.085|
|    0.85|               4515.438|
|     0.3|     2630.2410000000004|
|    0.45|     2083.4414999999995|
|     0.8|      635.6640000000002|
|       0|                    0.0|
+--------+-----------------------+



In [22]:
# what would have been the total profit if we removed all orders from that discount group? 
df.select(sum("Profit")+sum("Amount of discount")).show()

+---------------------------------------+
|(sum(Profit) + sum(Amount of discount))|
+---------------------------------------+
|                      843313.1100000013|
+---------------------------------------+



In [23]:
#how much more (or less) profit is that?
## More profit (470483.3685)
df.select((sum("Profit")+sum("Amount of discount"))-sum("Profit")).show()


+-------------------------------------------------------+
|((sum(Profit) + sum(Amount of discount)) - sum(Profit))|
+-------------------------------------------------------+
|                                     470483.36850000074|
+-------------------------------------------------------+



In [24]:
# create a temporary table for our superstore table in sql
df.createOrReplaceTempView("superstore")

In [26]:
# use an SQL query to count the number of rows
spark.sql("SELECT COUNT(*) FROM superstore").show()

+--------+
|count(1)|
+--------+
|   10000|
+--------+



In [27]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
spark.sql("SELECT Country , SUM(Profit)/SUM(Sales) AS profit_ratio FROM superstore GROUP BY Country").show()

+--------------+--------------------+
|       Country|        profit_ratio|
+--------------+--------------------+
|        Sweden| -0.5745674280714466|
|       Germany| 0.17066792076621765|
|        France| 0.12693568221933804|
|       Belgium| 0.23508766583987942|
|       Finland| 0.18864296633316185|
|         Italy| 0.06844355185424991|
|        Norway|  0.2517747548521659|
|         Spain| 0.18941580658358978|
|       Denmark| -0.4957190005664471|
|       Ireland|-0.44426677493909256|
|   Switzerland|  0.2909201193350232|
|      Portugal| -0.5761662270806188|
|       Austria|  0.2641908775042505|
|United Kingdom| 0.21170103540397134|
|   Netherlands| -0.5298342790541865|
+--------------+--------------------+



In [28]:
# is the country with the largest profit ratio, the country with the largest profit?
# No , The largest profit ratio is (Switzerland) BUT The largest profit (United Kingdom)

spark.sql("SELECT Country, SUM(Profit) , SUM(Profit)/SUM(Sales) AS profit_ratio FROM superstore GROUP BY Country ORDER BY SUM(Profit) DESC ").show()


+--------------+-------------------+--------------------+
|       Country|        sum(Profit)|        profit_ratio|
+--------------+-------------------+--------------------+
|United Kingdom| 111900.15000000001| 0.21170103540397134|
|        France| 109029.00299999975| 0.12693568221933804|
|       Germany| 107322.82049999991| 0.17066792076621765|
|         Spain|  54390.11999999999| 0.18941580658358978|
|       Austria|           21442.26|  0.2641908775042505|
|         Italy| 19828.757999999965| 0.06844355185424991|
|       Belgium|           11572.59| 0.23508766583987942|
|   Switzerland|  7237.470000000001|  0.2909201193350232|
|        Norway|            5167.77|  0.2517747548521659|
|       Finland|            3905.73| 0.18864296633316185|
|       Denmark|-4282.0470000000005| -0.4957190005664471|
|       Ireland| -7392.381000000003|-0.44426677493909256|
|      Portugal| -8703.059999999998| -0.5761662270806188|
|        Sweden|-17519.366999999987| -0.5745674280714466|
|   Netherland