# 4. Seasonal Recommendation Model
Authors: Anish Puthuraya, Sindhura Uppalapati, Srujana Gali, Anusha Ronaki

<hr>

## Set Parameters

In [None]:
root_folder = ""

input_data = root_folder + "processed_data/"

# Middleware represents the folder in which the intermediate datasets are saved into for efficiency purposes
middleware_data = root_folder + "Middleware/"

output_data = root_folder + "recommendations/"

In [None]:
current_date = '2023-10-31'
window = [-3, 0]

season = ["week", "month"]
season = season[0]

## Import Packages

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType
import pyspark.sql.functions as F
import pandas as pd
import os.path

In [None]:
spark = SparkSession.builder.master("local[1]").appName('seasonal_recommendation_model').getOrCreate()

23/07/31 13:40:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
current_date = pd.to_datetime(current_date).normalize()

<hr>

## Read and Process Data

### Add Season Column

In [None]:
if not os.path.exists(middleware_data + "tran_data_season.parquet"):

    data = spark.read.parquet(input_data).repartition(200)

    if season == "week":
        data = data.withColumn("season", F.weekofyear(F.col('tran_date')))
    elif season == "month":
        data = data.withColumn("season", F.month(F.col('tran_date')))

    data.write.parquet(middleware_data + "tran_data_season.parquet")

data = spark.read.parquet(middleware_data + "tran_data_season.parquet")
data.show(5)

+----------+----------+---------+-----+------+
| tran_date|product_id|banner_id|Count|season|
+----------+----------+---------+-----+------+
|2023-03-29|    244894|     1001|  328|    13|
|2021-10-03|    164489|     1001|   18|    39|
|2022-02-06|    225287|     1001|   77|     5|
|2022-12-29|    297688|     1001|    0|    52|
|2021-04-22|    256079|     1001|    3|    16|
+----------+----------+---------+-----+------+
only showing top 5 rows



### Compute Product-Season Mean

In [None]:
if not os.path.exists(middleware_data + "product_season_mean.parquet"):
    product_season_mean = data.groupBy('product_id', 'season').agg({'Count': 'mean'})
    product_season_mean = product_season_mean.withColumnRenamed('avg(Count)', 'product_season_mean')

    product_season_mean.write.parquet(middleware_data + "product_season_mean.parquet")

product_season_mean = spark.read.parquet(middleware_data + "product_season_mean.parquet")
product_season_mean.show(5)

+----------+------+-------------------+
|product_id|season|product_season_mean|
+----------+------+-------------------+
|      3930|    29|  398.7857142857143|
|    251881|    46|                0.0|
|    284340|    52|0.14285714285714285|
|    325215|    14|                0.0|
|    177095|    43|0.07142857142857142|
+----------+------+-------------------+
only showing top 5 rows



### Compute Product Mean

In [None]:
if not os.path.exists(middleware_data + "product_mean.parquet"):
    product_mean = data.groupby('product_id').agg({'Count': 'mean'})
    product_mean = product_mean.withColumnRenamed('avg(Count)', 'product_mean')

    product_mean.write.parquet(middleware_data + "product_mean.parquet")

product_mean = spark.read.parquet(middleware_data + "product_mean.parquet")
product_mean.show(5)

+----------+-------------------+
|product_id|       product_mean|
+----------+-------------------+
|    311373|  1.583050847457627|
|    255347|0.45084745762711864|
|    321525| 30.010169491525424|
|    340808|  3.222598870056497|
|    237752|                0.0|
+----------+-------------------+
only showing top 5 rows



### Join Product Mean and Product-Season Mean

In [None]:
product_season_mean = product_season_mean.join(product_mean, on='product_id', how='left')
product_season_mean.orderBy(["product_id", "season"], ascending=[True, True]).show(5)



+----------+------+-------------------+------------------+
|product_id|season|product_season_mean|      product_mean|
+----------+------+-------------------+------------------+
|       206|     1| 13.714285714285714|14.618079096045198|
|       206|     2|  13.80952380952381|14.618079096045198|
|       206|     3| 15.952380952380953|14.618079096045198|
|       206|     4|  16.19047619047619|14.618079096045198|
|       206|     5|  16.38095238095238|14.618079096045198|
+----------+------+-------------------+------------------+
only showing top 5 rows



                                                                                

### Extract Current Season and Define Window List

In [None]:
if season == "week":
    current_season = current_date.isocalendar().week
elif season == "month":
    current_season = current_date.dt.month

current_season

44

In [None]:
if season == "week":
    window_list = [(current_season + i) % 53 for i in range(window[0], window[1] + 1)]
elif season == "month":
    window_list = [(current_season + i) % 12 for i in range(window[0], window[1] + 1)]

window_list

[41, 42, 43, 44]

### Generate Recommendations for the Current Date

In [None]:
# Filter the DataFrame based on 'season' column
product_season_mean_filtered = product_season_mean.filter(product_season_mean['season'].isin(window_list))

# Group by 'product_id' and calculate mean for 'product_season_mean' and 'product_mean'
product_season_mean_filtered = product_season_mean_filtered.groupBy('product_id')\
    .agg(F.mean('product_season_mean').alias('product_season_mean'),
         F.mean('product_mean').alias('product_mean'))

# Calculate 'seasonality_factor'
product_season_mean_filtered = product_season_mean_filtered.withColumn(
    'seasonality_factor',
    F.col('product_season_mean') / F.col('product_mean'))

# Drop 'product_season_mean' and 'product_mean' columns, and impute nulls as 0
product_season_mean_filtered = product_season_mean_filtered\
  .drop('product_season_mean', 'product_mean')\
  .fillna(0, subset=["seasonality_factor"])

# Sort by 'seasonality_factor' in descending order
recommendations = product_season_mean_filtered.sort(F.desc('seasonality_factor'))

In [None]:
recommendations.show(5)



+----------+------------------+
|product_id|seasonality_factor|
+----------+------------------+
|    332477| 15.80357142857143|
|    335739| 15.80357142857143|
|    330646| 15.80357142857143|
|    332968|15.803571428571429|
|    335730|15.803571428571429|
+----------+------------------+
only showing top 5 rows



                                                                                

<hr>

## Export Data

In [None]:
recommendations.write.format('parquet') \
  .mode('overwrite').save(output_data + current_date.strftime('%Y-%m-%d'))

                                                                                