In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

In [3]:
spark

In [4]:
# read our data - lives in a csv file

df = spark.read.option("header","true").option("inferSchema", "true").csv("Sample - EU Superstore.csv")
df.show()
df.printSchema()

+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corporate|        Leeds|             England|United Kingdom|  North|OFF-ST-10000988|Office Supplies|     Storage|Fellowes 

In [5]:
# how many rows of the EU Superstore dataset have the country being France
total_france = df.filter(df['Country'] == 'France').count()
total_france

2827

In [6]:
# of those, how many are profitable?

total_france_profitable = df.filter((df['Country'] == 'France') & (df['Profit'] > 0)).count()
total_france_profitable

2329

In [7]:
percentage_france_profitable = round((total_france_profitable / total_france) * 100, 2)
print(percentage_france_profitable, '%')

82.38 %


In [8]:
# how many different discount brackets exist? what are they?
df.select('Discount').distinct().count() - 1 # number of different "Discount" brackets

13

discount brackets = range or category of discounts offered on a product or service. 

In [9]:
df.select('Discount').distinct().orderBy('Discount').show()

+--------+
|Discount|
+--------+
|     0.0|
|     0.1|
|    0.15|
|     0.2|
|     0.3|
|    0.35|
|     0.4|
|    0.45|
|     0.5|
|     0.6|
|    0.65|
|     0.7|
|     0.8|
|    0.85|
+--------+



In [12]:
# let's see the total profit by discount bracket, make sure they are ordered by 
from pyspark.sql.functions import sum, round
total_profit = df.groupBy('Discount') \
                 .agg(round(sum('Profit'), 2)) \
                 .withColumnRenamed('round(sum(Profit), 2)', 'TotalProfit') \
                 .orderBy('Discount')
total_profit.show()

+--------+-----------+
|Discount|TotalProfit|
+--------+-----------+
|     0.0|  383806.53|
|     0.1|  126884.03|
|    0.15|   24677.56|
|     0.2|    2189.55|
|     0.3|    -758.42|
|    0.35|   -9122.65|
|     0.4|  -21346.43|
|    0.45|   -1103.19|
|     0.5|  -96632.12|
|     0.6|  -20517.46|
|    0.65|   -6221.97|
|     0.7|   -5496.77|
|     0.8|    -460.28|
|    0.85|   -3068.66|
+--------+-----------+



In [13]:
# what is the value after which we should stop offering discount?
# Solution 1: Spark SQL 

total_profit.createOrReplaceTempView('total_profit')
# spark.sql("SELECT * FROM total_profit").show()
max_profitable_discount = spark.sql("SELECT MAX(Discount) AS MaxDiscount FROM total_profit WHERE TotalProfit > 0") # maximum discount among profitable transactions
max_profitable_discount.show()

+-----------+
|MaxDiscount|
+-----------+
|        0.2|
+-----------+



In [14]:
# Accessing the value
max_profitable_discount.first()['MaxDiscount']

0.2

In [15]:
# what is the value after which we should stop offering discount?
# Solution 2: Spark
from pyspark.sql.functions import max

positive_total_profit = total_profit.filter(total_profit['TotalProfit'] > 0)
max_profitable_discount = positive_total_profit.select(max('Discount')).first()[0] # first() method to retrieves the first row of the result, and [0] is used to access the actual maximum value.
max_profitable_discount

0.2

In [16]:
positive_total_profit.select(max('Discount')).show() # Spark DataFrame

+-------------+
|max(Discount)|
+-------------+
|          0.2|
+-------------+



In [17]:
positive_total_profit.select(max('Discount')).first() # List

Row(max(Discount)=0.2)

In [18]:
positive_total_profit.select(max('Discount')).first()[0] # Value

0.2

In [75]:
# who are the top 5 most profitable customers

top5_customers = df.groupBy('Customer ID') \
                .agg(round(sum('Profit'), 2)) \
                .withColumnRenamed('round(sum(Profit), 2)', 'Profit') \
                .orderBy('Profit', ascending=False) \
                .limit(5)
top5_customers.show()

+-----------+-------+
|Customer ID| Profit|
+-----------+-------+
|   SP-20920|4974.51|
|   PJ-18835| 3986.0|
|   PO-18865| 3778.2|
|   EB-13840|3459.66|
|   MG-18145|3144.44|
+-----------+-------+



In [57]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?
# hard coded solution
df.filter(  (df['Customer ID'] == 'SP-20920') \
          | (df['Customer ID'] == 'PJ-18835') \
          | (df['Customer ID'] == 'PO-18865') \
          | (df['Customer ID'] == 'EB-13840') \
          | (df['Customer ID'] == 'MG-18145')) \
          .count()

76

In [93]:
# solution 2
# collecting the results into an array of rows
top5_rows = top5_customers.collect()

# extracting customer names from the collected results
top5_customer_names = [row['Customer ID'] for row in top5_rows]

# filtering the original rows based on customer names
rows_for_top5_customers = df.filter(df['Customer ID'].isin(top5_customer_names))

# showing how many records there are in total for top 5 profitable customers
print(f"Numero totale di righe per i primi 5 clienti redditizi: {rows_for_top5_customers.count()}")

Numero totale di righe per i primi 5 clienti redditizi: 76


In [97]:
# create a new column which is the value of the sale where there is not discount applied. Hint: orginal = sales/(1-d)
from pyspark.sql.functions import col

original = col('Sales')
original = round(original / (1 - col('Discount')), 2)
original

Column<'round((Sales / (1 - Discount)), 2)'>

In [98]:
df1 = df.withColumn('original_sales', original)
df1.show()

+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+--------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|original_sales|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+--------------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corporate|        Leeds|             England|United Kingdom|  North|OFF-ST-10

In [102]:
SalesDiscountDifference = round(col("Sales") - col("Discount"), 2)
SalesDiscountDifference

TypeError: 'Column' object is not callable

In [110]:
# calculate the difference between sales and discount value
from pyspark.sql.functions import abs
df2 = df1.withColumn("SalesDiscountDifference", abs(round(col("Sales") - col("original_sales"), 2)))
df2.show()
# I decided to calculate the absolute value of "SalesDiscountDifference" in order to facilitate the next operation

+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+--------------+-----------------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|original_sales|SalesDiscountDifference|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+--------------+-----------------------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corpo

In [141]:
# how much money did we not gain due to the discounts - per discount bracket?
lost_profit = df2.groupBy('Discount') \
                 .agg(round(sum(df2['SalesDiscountDifference']), 2)) \
                 .withColumnRenamed('round(sum(SalesDiscountDifference), 2)', 'LostProfit') \
                 .orderBy('Discount')

lost_profit.show()

+--------+----------+
|Discount|LostProfit|
+--------+----------+
|     0.0|       0.0|
|     0.1|  84712.86|
|    0.15|  45233.16|
|     0.2|  10653.11|
|     0.3|   2630.23|
|    0.35|  29163.12|
|     0.4|  46724.64|
|    0.45|   2083.44|
|     0.5| 183735.71|
|     0.6|  39644.07|
|    0.65|  12219.66|
|     0.7|    8534.1|
|     0.8|    635.67|
|    0.85|   4515.44|
+--------+----------+



In [144]:
# find the discount bracket which made us not gain the most (dynamically)
# solution 1
lost_profit.orderBy('LostProfit', ascending=False).show(1)

+--------+----------+
|Discount|LostProfit|
+--------+----------+
|     0.5| 183735.71|
+--------+----------+
only showing top 1 row



In [160]:
# solution 2
max_loss = lost_profit.select(max('LostProfit'))
max_loss = max_loss.first()[0]

lost_profit.filter(lost_profit['LostProfit'] == max_loss).show()

+--------+----------+
|Discount|LostProfit|
+--------+----------+
|     0.5| 183735.71|
+--------+----------+



In [182]:
# what would have been the total profit if we removed all orders from that discount group? 
df2.filter(df['Discount'] != 0.5).select(round(sum('Profit'), 2)).withColumnRenamed('round(sum(Profit), 2)', 'TotalProfitCleaned').show()
total_profit_cleaned = df2.filter(df['Discount'] != 0.5).select(round(sum('Profit'), 2)).first()[0] # accessing the value
total_profit_cleaned

+------------------+
|TotalProfitCleaned|
+------------------+
|         469461.86|
+------------------+



469461.86

In [189]:
#how much more (or less) profit is that?
total_profit_raw = df.select(round(sum('Profit'))).first()[0]

total_profit_raw - total_profit_cleaned
# the profit without the sales with 50% Discount would have been of 96631.86€ more

-96631.85999999999

In [190]:
# create a temporary table for our superstore table in sql
df.createOrReplaceTempView('superstore')

In [199]:
# use an SQL query to count the number of rows
spark.sql("SELECT COUNT(*) FROM superstore").show()
spark.sql("SELECT COUNT(*) FROM superstore").first()[0]

+--------+
|count(1)|
+--------+
|   10000|
+--------+



10000

In [205]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
spark.sql("SELECT country, ROUND(SUM(profit)/SUM(sales), 2) AS profit_ratio \
           FROM superstore \
           GROUP BY country").show()

+--------------+------------+
|       country|profit_ratio|
+--------------+------------+
|        Sweden|       -0.57|
|       Germany|        0.17|
|        France|        0.13|
|       Belgium|        0.24|
|       Finland|        0.19|
|         Italy|        0.07|
|        Norway|        0.25|
|         Spain|        0.19|
|       Denmark|        -0.5|
|       Ireland|       -0.44|
|   Switzerland|        0.29|
|      Portugal|       -0.58|
|       Austria|        0.26|
|United Kingdom|        0.21|
|   Netherlands|       -0.53|
+--------------+------------+



In [210]:
# is the country with the largest profit ratio, the country with the largest profit?

highest_profit_ratio_country = spark.sql("SELECT country, ROUND(SUM(profit)/SUM(sales), 2) AS profit_ratio \
                                          FROM superstore \
                                          GROUP BY country \
                                          ORDER BY profit_ratio DESC \
                                          LIMIT 1").first()[0]
highest_profit_ratio_country

'Switzerland'

In [213]:
highest_profit_country = spark.sql("SELECT country, ROUND(SUM(profit), 2) AS profit \
                                    FROM superstore \
                                    GROUP BY country \
                                    ORDER BY profit DESC \
                                    LIMIT 1").first()[0]
highest_profit_country

'United Kingdom'

In [214]:
highest_profit_ratio_country == highest_profit_country

False