The objective of this project is to go through ETL process using PySpark and prepare a simple Data Warehouse for food item prices tracking in XXI century Poland. The dataset is free to download from FAOSTAT webpage: https://www.fao.org/faostat/en/#data/PP.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FoodPrices").getOrCreate()

After creating a spark session, let's have a look at the csv file.

In [None]:
df = spark.read.csv('data/Prices_E_Europe_NOFLAG.csv', header=True, inferSchema=True)
df.show()

As a result of an ETL process we want to get a specific information about food prices in Poland in specific months during XXI century.
Let's start filtering our dataframe.

In [3]:
from pyspark.sql.functions import col

df = df.filter(col('Area') == 'Poland')
df = df.filter(col('Months') != 'Annual value')

df = df.filter(col('Unit') == 'LCU') # tylko rekordy z cenami w lokalnej walucie -> PLN

year_columns = [c for c in df.columns 
                 if c.startswith("Y20") or c.startswith("Y21")] # don't take years before 2000

df = df.select(*["Item Code", "Item", "Months"], *year_columns)

We are leaving only necessary columns for further analysis (for example leaving area columns since all would have the same value).
Let's see the result top 5 rows.

In [None]:
df.head(5)

Having an output dataframe, let's limit number of columns by pivoting - creating different row for every year with corresponding month. This way we change an orientation of the table from wider to longer.

In [None]:
pivot_expression = "stack({0}, {1}) as (Year, Price)".format(
    len(year_columns),
    ", ".join([f"'{y[1:]}', `{y}`" for y in year_columns])  # Delete leading YXXXX in the 'year'
)

df = df.selectExpr("*", pivot_expression).drop(*year_columns) 
df.head(5)

We can see that after the transformation we have some rows where 'Price'=None. Since this is a crucial value for our analysis, we can get rid of the records that do not have it - there is no sense keeping the records without measures.

In [None]:
df = df.filter(col("Price").isNotNull())
df.head(5)

Now, having prepared our data in this way we can proceed to model it for a simple data warehouse.
Our fact table will consist of price value, while dimensions will be time(date) and item.

Let's start from dimension tables.

In [None]:
dim_item = df.select("Item Code", "Item").distinct() \
    .withColumnRenamed("Item Code", "id") \
    .withColumnRenamed("Item", "name")

dim_item.head(5)

In [None]:
# it is more convinient to have a number corresponding to the month apart from its name
months_dict = {
    "January": 1, "February": 2, "March": 3, "April": 4,
    "May": 5, "June": 6, "July": 7, "August": 8,
    "September": 9, "October": 10, "November": 11, "December": 12
}

df_months = spark.createDataFrame(months_dict.items(), ["month_name", "month_num"])

dim_date = df.select("Year", "Months").distinct() \
    .join(df_months, df.Months == df_months.month_name, "left") \
    .withColumnRenamed("Year", "year") \
    .withColumnRenamed("month_num", "month_num") \
    .drop("Months") # avoid redundation 

dim_date.head(5)

Let's also add an information about a corresponding quarter to each date.

In [None]:
from pyspark.sql import functions as F

dim_date = dim_date.withColumn(
    "quarter",
    F.when(F.col("month_num").between(1, 3), "Q1")
     .when(F.col("month_num").between(4, 6), "Q2")
     .when(F.col("month_num").between(7, 9), "Q3")
     .otherwise("Q4")
)

dim_date.head(5)

Every table, for the sake of speed, should have an id that is unique and consist of only one column. 
In the item dimension it is already satisfied since every food item has its unique id.
For the date dimension table we will create one artificial.

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

dim_date = dim_date.withColumn("id", monotonically_increasing_id())
dim_date.filter(col('year') == 2023).show()

Now we can create our fact table.
It is crucial to connect it to our dimension tables based on the ids.

In [11]:
f = df.select(
    col("Item Code").alias("item_id"),
    col("year"),
    col("Months"),
    col("Price").alias("price_pln")
).alias("f")

d = dim_date.alias("d")
i = dim_item.alias("i")

fact_prices = f.join(
        d,
        (col("f.year") == d.year) &
        (col("f.Months") == d.month_name),
        "left"
    ).join(
        i,
        (col("f.item_id") == i.id)
    ).select(
        col("f.item_id"), 
        col("d.id").alias("date_id"),
        col("price_pln")
    )

Now we can look at out DW tables:

In [None]:
print("FACT TABLE: food prices")
fact_prices.show()

print("DIMENSION TABLE: food items")
dim_item.show()

print("DIMENSION TABLE: dates")
dim_date.show()

For this DW we can start writing some analytical queries to get insights on the food market.

Let's start from extracting the most expensive product in the last year (on the average).

In [None]:
# Extract ids of the dates with 2023 year (last year with available data)
dates2023 = dim_date.select('id').filter(col('year') == 2023)

avg_prices = (
    fact_prices.join(dates2023, col("date_id") == dates2023.id, "inner")
            .groupBy("item_id")
            .agg(F.avg("price_pln").alias("avg_price"))
)

# sort and take the highest one only
most_expensive_product = avg_prices.orderBy(F.desc("avg_price")).limit(1)

# get item name from dimension table
most_expensive_product = (
    most_expensive_product
        .join(i, col("item_id") == i.id, "left")
        .select(col("i.name"), col("avg_price"))
)

most_expensive_product.show()


Now let's try to get the fastest growing prices products in the last year. For this, let's take an average price of Q4 in corresponding years (2022 & 2023).

In [None]:
# get average price for each quarter
quarters = (fact_prices
                .join(d, fact_prices.date_id == d.id, "inner")
                .filter(col("d.quarter") == "Q4")
                .groupBy("f.item_id", "d.year")
                .agg(F.avg("price_pln").alias("avg_price"))
)

q4_2023 = quarters.filter(col("year") == 2023).select(
    "item_id", col("avg_price").alias("avg_2023"))

q4_2022 = quarters.filter(col("year") == 2022).select(
    "item_id", col("avg_price").alias("avg_2022"))

# calculate price difference (both absolute and percentage)
price_change = (q4_2022.join(q4_2023, "item_id", "inner")
           .withColumn("abs_change", col("avg_2023") - col("avg_2022"))
           .withColumn("%_change", (col("abs_change") / col("avg_2022")) * 100)
)

# add item name instead of id
price_change = (price_change
                .join(i, i.id == price_change.item_id, 'left')
                .drop('item_id', 'price_change.id')
)

price_change = price_change.orderBy(col("%_change").desc())
price_change.show()
