In [18]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [19]:
spark = SparkSession.builder.master("local[*]").appName("Datamanipulation").getOrCreate()

In [20]:
spark

In [21]:
# read our data - lives in a csv file

df = spark.read.option("header","true").csv("/Users/pc home/Desktop/Data_PySpark_BigData_Exercises/Sample - EU Superstore.csv")

In [22]:
# how many rows of the EU Superstore dataset have the country being France
df_france= df.filter(df['Country'] == 'France')
count_df_france = df_france.count()
print(f"The number of rows with the country being France: {count_df_france}")

The number of rows with the country being France: 2827


In [23]:
# of those, how many are profitable?
profit_france = df_france.filter(df_france['Profit'] > 0)
count_profit_france = profit_france.count() 
print(f"The number of profitable rows with the country being France: {count_profit_france}")

The number of profitable rows with the country being France: 2277


In [24]:
# how any different discount brackets exist? what are they?
distinct_discounts = df.select('Discount').distinct()
discount_brackets = [row['Discount'] for row in distinct_discounts.collect()]
print(f"The number of different discount brackets: {len(discount_brackets)}")
print(f"Discount brackets: {sorted(discount_brackets)}")

The number of different discount brackets: 14
Discount brackets: ['0', '0.1', '0.15', '0.2', '0.3', '0.35', '0.4', '0.45', '0.5', '0.6', '0.65', '0.7', '0.8', '0.85']


In [25]:
# let's see the totl profit by discount bracket, make sure they are ordered by 
total_profit_by_discount = df.groupBy('Discount').agg(F.sum('Profit').alias('TotalProfit'))
sorted_total_profit_by_discount = total_profit_by_discount.sort('Discount')
for row in sorted_total_profit_by_discount.collect():
    print(f"Discount: {row['Discount']} - Total Profit: {row['TotalProfit']}")


Discount: 0 - Total Profit: 383806.53000000026
Discount: 0.1 - Total Profit: 126884.0309999999
Discount: 0.15 - Total Profit: 24677.563499999975
Discount: 0.2 - Total Profit: 2189.5499999999984
Discount: 0.3 - Total Profit: -758.4209999999999
Discount: 0.35 - Total Profit: -9122.649
Discount: 0.4 - Total Profit: -21346.427999999996
Discount: 0.45 - Total Profit: -1103.1915
Discount: 0.5 - Total Profit: -96632.115
Discount: 0.6 - Total Profit: -20517.456000000002
Discount: 0.65 - Total Profit: -6221.965499999999
Discount: 0.7 - Total Profit: -5496.765
Discount: 0.8 - Total Profit: -460.284
Discount: 0.85 - Total Profit: -3068.658


In [26]:
# what is the value after which we should stop offering discount?
stop_value = None

for row in sorted_total_profit_by_discount.collect():
    discount = row['Discount']
    total_profit = row['TotalProfit']
    
    if total_profit < 0:
        stop_value = discount
        break
    
if stop_value is None:
    print("There is no threshold value where total profit becomes negative.")
else:
    print(f"The stop value after which we should stop offering discounts to maintain a positive profit: less than {stop_value}")




The stop value after which we should stop offering discounts to maintain a positive profit: less than 0.3


In [27]:
# who are the top 5 most profitable customers
total_profit_by_customer = df.groupBy('Customer ID').agg(F.sum('Profit').alias('TotalProfit'))
sorted_total_profit_by_customer = total_profit_by_customer.sort(F.desc('TotalProfit'))
top_5_profitable_customers = sorted_total_profit_by_customer.take(5)

print("Top 5 most profitable customers:")
for i, row in enumerate(top_5_profitable_customers, start=1):
    print(f"{i}. Customer ID: {row['Customer ID']} - Total Profit: {row['TotalProfit']}")


Top 5 most profitable customers:
1. Customer ID: SP-20920 - Total Profit: 4974.512999999999
2. Customer ID: PJ-18835 - Total Profit: 3986.0039999999995
3. Customer ID: PO-18865 - Total Profit: 3778.197
4. Customer ID: EB-13840 - Total Profit: 3459.66
5. Customer ID: MG-18145 - Total Profit: 3144.4439999999995


In [28]:
# get all the rows belonging to those 5 customer names: hint, you may need the collect method - how many rows are they?
top_5_customer_ids = [row['Customer ID'] for row in top_5_profitable_customers]
top_5_customers_rows = df.filter(df['Customer ID'].isin(top_5_customer_ids))
count_top_5_customers_rows = top_5_customers_rows.count()
print(f"Number of rows belonging to the top 5 most profitable customers: {count_top_5_customers_rows}")

Number of rows belonging to the top 5 most profitable customers: 76


In [29]:
# create a new column which is the value of the sale were there not discount applied. Hint: orginal = sales/(1-d)
df_with_original_value = df.withColumn('OriginalValue',df['Sales'] / (1 - df['Discount']))
df_with_original_value.show()

+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+------------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|     OriginalValue|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+------------------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corporate|        Leeds|             England|United Kingdom|  Nor

In [30]:
# calculate the difference between sales and discount value
diff = df_with_original_value.withColumn('difference', df_with_original_value['OriginalValue'] - df_with_original_value['Sales'])
diff.show()

+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+------------------+------------------+
|Row ID|       Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|   Customer Name|    Segment|         City|               State|       Country| Region|     Product ID|       Category|Sub-Category|        Product Name|   Sales|Quantity|Discount| Profit|     OriginalValue|        difference|
+------+---------------+----------+----------+--------------+-----------+----------------+-----------+-------------+--------------------+--------------+-------+---------------+---------------+------------+--------------------+--------+--------+--------+-------+------------------+------------------+
|     1|ES-2017-1311038|07/02/2017|11/02/2017|Standard Class|   AS-10045|  Aaron Smayling|  Corporat

In [31]:
# how much money did we not gain due to the discounts - per discount bracket?
total_difference_value_by_discount = diff.groupBy('Discount').agg(F.sum('difference').alias('TotalDifferenceValue'))
sorted_total_difference_value_by_discount = total_difference_value_by_discount.sort('Discount')
for row in sorted_total_difference_value_by_discount.collect():
    print(f"Discount: {row['Discount']} - Total Difference Value: {row['TotalDifferenceValue']}")


Discount: 0 - Total Difference Value: 0.0
Discount: 0.1 - Total Difference Value: 84712.44899999995
Discount: 0.15 - Total Difference Value: 45233.17650000002
Discount: 0.2 - Total Difference Value: 10653.119999999997
Discount: 0.3 - Total Difference Value: 2630.2410000000004
Discount: 0.35 - Total Difference Value: 29163.099000000002
Discount: 0.4 - Total Difference Value: 46724.68800000002
Discount: 0.45 - Total Difference Value: 2083.4414999999995
Discount: 0.5 - Total Difference Value: 183734.26500000045
Discount: 0.6 - Total Difference Value: 39644.04599999997
Discount: 0.65 - Total Difference Value: 12219.655499999999
Discount: 0.7 - Total Difference Value: 8534.085
Discount: 0.8 - Total Difference Value: 635.6640000000002
Discount: 0.85 - Total Difference Value: 4515.438


In [32]:
# find the discount bracket which made us not gain the most (dynamically)
sorted_by_max_difference = total_difference_value_by_discount.sort(F.desc('TotalDifferenceValue'))
highest_difference_row = sorted_by_max_difference.first()
print(f"Discount bracket with the highest total difference value: {highest_difference_row['Discount']} - Total Difference Value: {highest_difference_row['TotalDifferenceValue']}")



Discount bracket with the highest total difference value: 0.5 - Total Difference Value: 183734.26500000045


In [33]:
# what would have been the total profit if we removed all orders from that discount group? 
filtered_df = df.filter(df['Discount'] != highest_difference_row['Discount'])
total_profit_after_removal = filtered_df.agg(F.sum('Profit')).collect()[0][0]
print(f"Total profit after removing orders from the discount group with the highest total difference value: {total_profit_after_removal}")

Total profit after removing orders from the discount group with the highest total difference value: 469461.8565000003


In [34]:
#how much more (or less) profit is that?
current_total_profit = df.agg(F.sum('Profit')).collect()[0][0]
total_profit_after_removal = filtered_df.agg(F.sum('Profit')).collect()[0][0]
profit_difference = total_profit_after_removal - current_total_profit

if profit_difference > 0:
    print(f"Profit increased by: {profit_difference}")
elif profit_difference < 0:
    print(f"Profit decreased by: {-profit_difference}")
else:
    print("There is no change in profit.")


Profit increased by: 96632.11499999976


In [35]:
# create a temporary table for our superstore table in sql
df.createOrReplaceTempView("superstore_temp_table")


In [36]:
# use an SQL query to count the number of rows
row_count = spark.sql("SELECT COUNT(*) as total_rows FROM superstore_temp_table").collect()[0]["total_rows"]
print(f"Number of rows in the superstore_temp_table: {row_count}")

Number of rows in the superstore_temp_table: 10000


In [37]:
# Use an SQL query to calculate the profit ratio for each country: hint, ratio is sum(profit)/sum(sales)
profit_ratio_by_country = spark.sql("""
    SELECT
        Country,
        SUM(Profit) / SUM(Sales) as ProfitRatio
    FROM
        superstore_temp_table
    GROUP BY
        Country
    ORDER BY
        ProfitRatio DESC
""")
for row in profit_ratio_by_country.collect():
    print(f"Country: {row['Country']} - Profit Ratio: {row['ProfitRatio']}")


Country: Switzerland - Profit Ratio: 0.2909201193350232
Country: Austria - Profit Ratio: 0.2641908775042505
Country: Norway - Profit Ratio: 0.2517747548521659
Country: Belgium - Profit Ratio: 0.23508766583987942
Country: United Kingdom - Profit Ratio: 0.21170103540397134
Country: Spain - Profit Ratio: 0.18941580658358978
Country: Finland - Profit Ratio: 0.18864296633316185
Country: Germany - Profit Ratio: 0.17066792076621765
Country: France - Profit Ratio: 0.12693568221933804
Country: Italy - Profit Ratio: 0.06844355185424991
Country: Ireland - Profit Ratio: -0.44426677493909256
Country: Denmark - Profit Ratio: -0.4957190005664471
Country: Netherlands - Profit Ratio: -0.5298342790541865
Country: Sweden - Profit Ratio: -0.5745674280714466
Country: Portugal - Profit Ratio: -0.5761662270806188


In [38]:
# is the country with the largest profit ratio, the country with the largest profit?
country_largest_profit_ratio = spark.sql("""
    SELECT
        Country,
        SUM(Profit) / SUM(Sales) as ProfitRatio
    FROM
        superstore_temp_table
    GROUP BY
        Country
    ORDER BY
        ProfitRatio DESC
    LIMIT 1
""").collect()[0]["Country"]


In [39]:
country_largest_profit = spark.sql("""
    SELECT
        Country,
        SUM(Profit) as TotalProfit
    FROM
        superstore_temp_table
    GROUP BY
        Country
    ORDER BY
        TotalProfit DESC
    LIMIT 1
""").collect()[0]["Country"]
if country_largest_profit_ratio == country_largest_profit:
    print(f"The country with the largest profit ratio ({country_largest_profit_ratio}) is also the country with the largest profit.")
else:
    print(f"The country with the largest profit ratio ({country_largest_profit_ratio}) is NOT the country with the largest profit ({country_largest_profit}).")


The country with the largest profit ratio (Switzerland) is NOT the country with the largest profit (United Kingdom).
