In [17]:
!pip install pyspark



In [1]:
import pyspark
pyspark.__version__

'3.5.1'

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master('local') \
        .appName('c4') \
        .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/06/03 07:49:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [20]:
!head data/bakery.csv

ProductName,OriginalPrice,ApplicablePrice,Type,PercentageDiscount,Category
Millbakers English Muffins 300g,132,132.00,FOOD,0,bakery
Millbakers Queen Cupcakes 260g,99,99.00,FOOD,0,bakery
Sweet Moment Lemon &amp; Poppy Muffin 6&#39;s 300g,180,180.00,FOOD,0,bakery
Sweet Moment Chocchip Muffin 6&#39;s300g,180,180.00,FOOD,0,bakery
Millbakers Queen Cupcakes 200g,84,84.00,FOOD,0,bakery
Millbakers Family Madeira Cake 750g,236,236.00,FOOD,0,bakery
Festive Milky White Bread 800G,127,127.00,FOOD,0,bakery
Millbakers Standard Madeira Cake 500g,165,165.00,FOOD,0,bakery
Joy Super Bakers Queen Cake 350g (12 Pieces),154,154.00,FOOD,0,bakery


In [21]:
!wc -l data/bakery.csv

253 data/bakery.csv


In [22]:
# create a spark dataframe by reading the same file
df = spark.read.csv('data/bakery.csv')

In [23]:
df.show()

+--------------------+-------------+---------------+----+------------------+--------+
|                 _c0|          _c1|            _c2| _c3|               _c4|     _c5|
+--------------------+-------------+---------------+----+------------------+--------+
|         ProductName|OriginalPrice|ApplicablePrice|Type|PercentageDiscount|Category|
|Millbakers Englis...|          132|         132.00|FOOD|                 0|  bakery|
|Millbakers Queen ...|           99|          99.00|FOOD|                 0|  bakery|
|Sweet Moment Lemo...|          180|         180.00|FOOD|                 0|  bakery|
|Sweet Moment Choc...|          180|         180.00|FOOD|                 0|  bakery|
|Millbakers Queen ...|           84|          84.00|FOOD|                 0|  bakery|
|Millbakers Family...|          236|         236.00|FOOD|                 0|  bakery|
|Festive Milky Whi...|          127|         127.00|FOOD|                 0|  bakery|
|Millbakers Standa...|          165|         165.00|FO

In [7]:
# We forgot the header
df = spark.read \
     .option('header', 'true') \
     .csv('data/beverages.csv')

df.show()

+--------------------+-------------+---------------+----+------------------+---------+
|         ProductName|OriginalPrice|ApplicablePrice|Type|PercentageDiscount| Category|
+--------------------+-------------+---------------+----+------------------+---------+
|Red Bull Energy D...|          920|         782.00|FOOD|                15|beverages|
|Organic India Ori...|          799|         679.00|FOOD|                15|beverages|
|Quencher Life Pre...|          299|         299.00|FOOD|                 0|beverages|
|Mayers Natural Sp...|          103|         103.00|FOOD|                 0|beverages|
|Carrefour Mineral...|          495|         495.00|FOOD|                 0|beverages|
|Pick N Peel Orang...|          292|         254.00|FOOD|                13|beverages|
|Kericho Gold Pure...|          335|         335.00|FOOD|                 0|beverages|
|Quencher Life Pre...|          514|         514.00|FOOD|                 0|beverages|
|   Coca Cola Soda 2L|          190|       

In [25]:
# Check if the schema is correct (Spark always infers a csv file's schema as StringType for all the columns)
df.schema

StructType([StructField('ProductName', StringType(), True), StructField('OriginalPrice', StringType(), True), StructField('ApplicablePrice', StringType(), True), StructField('Type', StringType(), True), StructField('PercentageDiscount', StringType(), True), StructField('Category', StringType(), True)])

In [26]:
# Define our own schema using pandas
!pip install pandas

import pandas as pd

df_pandas = pd.read_csv('data/bakery.csv')



In [28]:
df_pandas.dtypes

ProductName            object
OriginalPrice           int64
ApplicablePrice       float64
Type                   object
PercentageDiscount      int64
Category               object
dtype: object

In [29]:
# use the pandas data frame to create a spark schema
spark.createDataFrame(df_pandas).schema

StructType([StructField('ProductName', StringType(), True), StructField('OriginalPrice', LongType(), True), StructField('ApplicablePrice', DoubleType(), True), StructField('Type', StringType(), True), StructField('PercentageDiscount', LongType(), True), StructField('Category', StringType(), True)])

In [30]:
from pyspark.sql import types

schema = types.StructType([
            types.StructField('ProductName', types.StringType(), True), 
            types.StructField('OriginalPrice', types.LongType(), True), 
            types.StructField('ApplicablePrice', types.DoubleType(), True), 
            types.StructField('Type', types.StringType(), True), 
            types.StructField('PercentageDiscount', types.LongType(), True), 
            types.StructField('Category', types.StringType(), True)
        ])

In [32]:
# Reread the data with the schema
df = spark.read \
     .option('header', 'true') \
     .schema(schema) \
     .csv('data/bakery.csv')

df.schema

StructType([StructField('ProductName', StringType(), True), StructField('OriginalPrice', LongType(), True), StructField('ApplicablePrice', LongType(), True), StructField('Type', StringType(), True), StructField('PercentageDiscount', LongType(), True), StructField('Category', StringType(), True)])

In [35]:
# write to a parquet file mode append to include all
df.write.parquet('data/pq/all_foods')

In [11]:
df = spark.read.parquet('data/pq/all_foods')
df.show()

+--------------------+-------------+---------------+----+------------------+---------+
|         ProductName|OriginalPrice|ApplicablePrice|Type|PercentageDiscount| Category|
+--------------------+-------------+---------------+----+------------------+---------+
|Red Bull Energy D...|          920|           null|FOOD|                15|beverages|
|Organic India Ori...|          799|           null|FOOD|                15|beverages|
|Quencher Life Pre...|          299|           null|FOOD|                 0|beverages|
|Mayers Natural Sp...|          103|           null|FOOD|                 0|beverages|
|Carrefour Mineral...|          495|           null|FOOD|                 0|beverages|
|Pick N Peel Orang...|          292|           null|FOOD|                13|beverages|
|Kericho Gold Pure...|          335|           null|FOOD|                 0|beverages|
|Quencher Life Pre...|          514|           null|FOOD|                 0|beverages|
|   Coca Cola Soda 2L|          190|       

Now we perform the sql queries on the full data

In [14]:
df_pq = spark.read.parquet('data/pq/all_foods')
df_pq.show()

+--------------------+-------------+---------------+----+------------------+---------+
|         ProductName|OriginalPrice|ApplicablePrice|Type|PercentageDiscount| Category|
+--------------------+-------------+---------------+----+------------------+---------+
|Red Bull Energy D...|          920|          782.0|FOOD|                15|beverages|
|Organic India Ori...|          799|          679.0|FOOD|                15|beverages|
|Quencher Life Pre...|          299|          299.0|FOOD|                 0|beverages|
|Mayers Natural Sp...|          103|          103.0|FOOD|                 0|beverages|
|Carrefour Mineral...|          495|          495.0|FOOD|                 0|beverages|
|Pick N Peel Orang...|          292|          254.0|FOOD|                13|beverages|
|Kericho Gold Pure...|          335|          335.0|FOOD|                 0|beverages|
|Quencher Life Pre...|          514|          514.0|FOOD|                 0|beverages|
|   Coca Cola Soda 2L|          190|       

In [13]:
# What is the average discount for each product category?
avg_discount = df_pq.groupBy('Category').agg({"PercentageDiscount": "avg"}).withColumnRenamed("avg(PercentageDiscount)", "AverageDiscount")
avg_discount.show()

+--------------+------------------+
|      Category|   AverageDiscount|
+--------------+------------------+
|     beverages|1.3629032258064515|
|fruits_n_veges|1.4074074074074074|
|        frozen|2.2713178294573644|
|    fresh_food|             1.958|
|        bakery| 0.623015873015873|
| food_cupboard|0.9447779111644657|
| bio_n_organic|0.4816326530612245|
+--------------+------------------+



In [18]:
# What is the total value of discounts provided in each category?
from pyspark.sql.functions import col
sum_discount_value = df_pq.withColumn("DiscountValue", col('OriginalPrice') - col('ApplicablePrice')) \
                          .groupBy('Category') \
                          .sum("DiscountValue") \
                          .withColumnRenamed("sum(DiscountValue)", "TotalDiscountValue")
                          
sum_discount_value.show()

+--------------+------------------+
|      Category|TotalDiscountValue|
+--------------+------------------+
|     beverages|            6315.0|
|fruits_n_veges| 7000.500000000002|
|        frozen|            7547.0|
|    fresh_food|         397407.35|
|        bakery|2665.1000000000004|
| food_cupboard|           16394.0|
| bio_n_organic|            2180.0|
+--------------+------------------+



In [19]:
# Which products have the highest and lowest discounts?

highest_discounted_product = df_pq.orderBy(col("PercentageDiscount").desc()).limit(10)

lowest_discounted_product = df_pq.orderBy(col("PercentageDiscount").asc()).limit(10)

highest_discounted_product.show()
lowest_discounted_product.show()

+--------------------+-------------+---------------+----+------------------+----------+
|         ProductName|OriginalPrice|ApplicablePrice|Type|PercentageDiscount|  Category|
+--------------------+-------------+---------------+----+------------------+----------+
|Carrefour Frozen ...|          414|          100.0|FOOD|                76|    frozen|
|Carrefour Frozen ...|          414|          100.0|FOOD|                76|    frozen|
|Carrefour Frozen ...|         2499|         1000.0|FOOD|                60|    frozen|
|South African  Be...|         3199|          649.5|FOOD|                59|fresh_food|
|South African  Be...|         4699|          999.5|FOOD|                57|fresh_food|
|Carrefour Frozen ...|          635|          300.0|FOOD|                53|    frozen|
|Carrefour Frozen ...|         1219|          600.0|FOOD|                51|    frozen|
|QUORN SOUTHERN ST...|          595|          298.0|FOOD|                50|    frozen|
|Star Soda Soft Dr...|          

In [24]:
# What is the distribution of product prices across categories?
from pyspark.sql.functions import min, max, avg, stddev

price_distribution = df_pq.groupBy("Category").agg(
    min("ApplicablePrice").alias("MinPrice"),
    max("ApplicablePrice").alias("MaxPrice"),
    avg("ApplicablePrice").alias("AvgPrice"),
    stddev("ApplicablePrice").alias("StddevPrice")
)

price_distribution.show()

+--------------+--------+--------+------------------+------------------+
|      Category|MinPrice|MaxPrice|          AvgPrice|       StddevPrice|
+--------------+--------+--------+------------------+------------------+
|     beverages|    10.0|  2450.0| 356.3637992831541| 405.5670849027246|
|fruits_n_veges|    18.0|  1538.0|  259.425925925926| 289.8139449620126|
|        frozen|    29.0|  2580.0|467.28682170542635| 372.3043672487805|
|    fresh_food|    20.0|  8980.0|         533.55965| 843.2890272901251|
|        bakery|    25.0|  1899.0|211.64246031746035|295.26648882830983|
| food_cupboard|     6.0| 10132.0|479.05402160864344| 786.7660366061849|
| bio_n_organic|    25.0|  5000.0|  534.804081632653|  588.491718244436|
+--------------+--------+--------+------------------+------------------+



In [26]:
# How many products have a discount applied per category? 
discounted_products_count = df_pq.filter(col("PercentageDiscount") > 0) \
                                 .groupBy("Category") \
                                 .count()
discounted_products_count.show()

+--------------+-----+
|      Category|count|
+--------------+-----+
|     beverages|  103|
|fruits_n_veges|    3|
|        frozen|   27|
|    fresh_food|   79|
|        bakery|    5|
| food_cupboard|   46|
| bio_n_organic|    7|
+--------------+-----+



In [27]:
# Most expensive and cheapest product on Carrefour that day

top_expensive_products = df_pq.orderBy(col("ApplicablePrice").desc()).limit(10)

top_cheap_products = df_pq.orderBy(col("ApplicablePrice").asc()).limit(10)

top_expensive_products.show()
top_cheap_products.show()

+--------------------+-------------+---------------+----+------------------+-------------+
|         ProductName|OriginalPrice|ApplicablePrice|Type|PercentageDiscount|     Category|
+--------------------+-------------+---------------+----+------------------+-------------+
|Elianto Corn Oil 20L|        10132|        10132.0|FOOD|                 0|food_cupboard|
|       Salmon Steak |         4490|         8980.0|FOOD|                 0|   fresh_food|
|Fresh Goat Carcas...|        13999|         8888.0|FOOD|                37|   fresh_food|
|Fresh Lamb Carcas...|        13999|         8888.0|FOOD|                37|   fresh_food|
|  Fresh Salmon Whole|         4199|         8398.0|FOOD|                 0|   fresh_food|
| Halal Turkey Salami|         8050|         8050.0|FOOD|                 0|   fresh_food|
|  Fresh Goat Carcass|          999|         7480.0|FOOD|                25|   fresh_food|
|       Extra Chorizo|         7400|         7400.0|FOOD|                 0|   fresh_food|