# Project data model
In the Project data model, we will present the details for the data preparation, model training and model evaluation.

## Data Preparation
The primary objective during the data preparation phase is to construct a Utility Matrix that will serve as the foundation for our recommendation system. To achieve this, we will generate explicit ratings derived from the amount of time a user spends on a particular item. This approach assumes that the duration of interaction directly correlates with user preference, thereby allowing us to quantify interest levels in a meaningful way. The utility Matrix will be built using the following structure:




| UserID    | ItemID    | Session_Duration |
|-----------|-----------|------------------|
| User_1    | Item_A    | 120              |
| User_1    | Item_B    | 60               |
| User_2    | Item_A    | 45               |
| User_2    | Item_C    | 30               |
| User_3    | Item_B    | 85               |
| User_3    | Item_C    | 90               |

### Step 1: Importing the data set

#### Feature Selection Details

In this step, we  selected specific features from the BigQuery database by running an SQL query. These features are pivotal for our analysis/model, offering insights into user behavior, session details, and product interactions. Below is an overview of the selected features and their significance:

1. **`fullVisitorId` | User ID**: A unique identifier for each user visiting the website. 

2. **`visitNumber` | Session/Visit Number**: Indicates the ordinal number of the user's visit. For example, the first visit is 1, the second visit is 2, and so on.

3. **`hits.eCommerceAction.action_type` | Ecommerce Action Type**: Categorizes the type of interaction a user had, such as viewing an item list (1), viewing a specific item (2), etc.

4. **`hits.time` | Action Time**: Timestamp indicating when the action occurred.

5. **`hits.hitNumber` | Event Number Within a Session**: Sequential number of the event/action within a session, starting from 1.

6. **`prod.productSKU` | Product ID**: Unique identifier for each product that was interacted with.



#### Data Retrieval Process


During this phase, we initiated the retrieval of raw data from the BigQuery public dataset. To conduct a preliminary evaluation of our analytical model, we opted to extract a dataset entries from a singular day's worth of data (August 1, 2017).

```sql
SELECT fullVisitorId, visitNumber, h.eCommerceAction.action_type, prod.productSKU, h.time, h.hitNumber
FROM bigquery-public-data.google_analytics_sample.ga_sessions_20170801, UNNEST(hits) as h, UNNEST(h.product) as prod
ORDER BY fullVisitorId ASC, visitNumber ASC, h.time ASC
```
The results from the executed query have been saved to the file located at `data/ga_sessions_20170801.csv`.

### Step 2: Data Preprocessing

#### Initialize Spark and necessary imports

In [2]:
from pyspark.sql import SparkSession, functions as F, Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark session
def init_spark():
    return SparkSession \
        .builder \
        .appName("GA360RECOMMENDER") \
        .getOrCreate()

spark = init_spark()

#### Defining the Sechma and loading the data into Spark DataFrame

In [3]:
schema = StructType([
    StructField("fullVisitorId", StringType(), True),
    StructField("visitNumber", IntegerType(), True),
    StructField("action_type", StringType(), True),
    StructField("productSKU", StringType(), True),
    StructField("time", IntegerType(), True),
    StructField("hitNumber", IntegerType(), True)
])

df = spark.read.csv("../data/ga_sessions_20170801.csv", header=True, schema=schema)

df.show(5)
print("Total number of rows in the dataframe: ", df.count(), "row")

+-------------------+-----------+-----------+--------------+----+---------+
|      fullVisitorId|visitNumber|action_type|    productSKU|time|hitNumber|
+-------------------+-----------+-----------+--------------+----+---------+
|0004915997121163857|          1|          0|GGOEYFKQ020699|   0|        1|
|0004915997121163857|          1|          0|GGOEYDHJ056099|   0|        1|
|0004915997121163857|          1|          0|GGOEYHPB072210|   0|        1|
|0004915997121163857|          1|          0|GGOEYOCR077799|   0|        1|
|0004915997121163857|          1|          0|  GGOEGAAX0351|   0|        1|
+-------------------+-----------+-----------+--------------+----+---------+
only showing top 5 rows

Total number of rows in the dataframe:  47723 row


#### Calculate the Session Duration 

In [4]:
# Define window specification for calculating pageview durations
windowSpec = Window.partitionBy("fullVisitorId", "visitNumber").orderBy("time")

# Calculate the next hit's time and pageview duration
df_with_durations = df.withColumn("next_time", F.lead("time", 1).over(windowSpec)) \
                      .withColumn("pageview_duration", F.when(F.isnull(F.col("next_time") - F.col("time")), 1)
                                                          .otherwise(F.col("next_time") - F.col("time")))

# Filter for product detail views only 
prodview_durations = df_with_durations.filter(df_with_durations.action_type == '2') \
                                      .select("fullVisitorId", "visitNumber", "productSKU", "pageview_duration")

# Aggregate pageview durations by fullVisitorId and productSKU
aggregate_web_stats = prodview_durations.groupBy("fullVisitorId", "productSKU") \
                                        .agg(F.sum("pageview_duration").alias("session_duration"))

# Display the aggregated results
aggregate_web_stats.orderBy(aggregate_web_stats.fullVisitorId.asc()).show(10)

+-------------------+--------------+----------------+
|      fullVisitorId|    productSKU|session_duration|
+-------------------+--------------+----------------+
|0049931492016965831|GGOEGEVA022399|            9821|
|0052381813974609729|GGOEAOCB077499|           14292|
|0052381813974609729|GGOEGOCB017499|            6931|
|0052381813974609729|GGOEGOCC077299|            4745|
| 008016723867009901|GGOEGESB015099|            1488|
| 008016723867009901|GGOEGBJL013999|            1419|
| 008016723867009901|GGOEGDHC074099|            1394|
| 008016723867009901|GGOEGESC014099|               0|
| 008016723867009901|GGOEGCKQ013199|               1|
| 008016723867009901|GGOEACCQ017299|               0|
+-------------------+--------------+----------------+
only showing top 10 rows



#### Normalization with Z-Score
In this phase, we apply Z-Score normalization to the session durations to standardize the data, ensuring it has a mean of 0 and a standard deviation of 1. This technique is chosen to normalize the variability across users' interactions, making the dataset more suitable for collaborative filtering algorithms by mitigating the influence of outliers and varying scales of engagement.

In [5]:
from pyspark.sql.functions import mean, stddev

# Calculate the mean and standard deviation of session_duration
mean_val = aggregate_web_stats.select(mean(aggregate_web_stats['session_duration'])).collect()[0][0]
stddev_val = aggregate_web_stats.select(stddev(aggregate_web_stats['session_duration'])).collect()[0][0]

# Apply Z-score normalization
normalized_df = aggregate_web_stats.withColumn('normalized_duration', 
                   (aggregate_web_stats['session_duration'] - mean_val) / stddev_val)

normalized_df.orderBy(normalized_df.normalized_duration.desc()).show(10)

+-------------------+--------------+----------------+-------------------+
|      fullVisitorId|    productSKU|session_duration|normalized_duration|
+-------------------+--------------+----------------+-------------------+
|0834628261584717467|  GGOEGAAX0325|         1527925| 14.694681589535893|
|0834628261584717467|  GGOEGAAX0686|         1470110| 14.131252184050535|
|0485797735449723544|GGOEGESB015199|         1172597| 11.231873602122572|
| 431781159932899381|GGOEGBRJ037299|          759499| 7.2060747515268755|
|7484497031611210287|GGOEYHPB072210|          748694|  7.100775871888905|
|5873059317509196502|  GGOEGAAX0104|          594894| 5.6019357341452745|
|2863022817351466072|GGOEYFKQ020699|          536689|  5.034705628700749|
|7641607978785523241|GGOEGGCX056199|          443459|  4.126143430769419|
|2827498353821012092|  GGOEGAAX0680|          427854| 3.9740667054801513|
|1933634293342529288|GGOEGDHQ015399|          370320|  3.413375753042297|
+-------------------+--------------+--

In [6]:
utility_matrix = normalized_df.groupBy("fullVisitorId").pivot("productSKU").agg(F.first("normalized_duration"))

# Show the result
utility_matrix.show(5)

+-------------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------------+--------------+--------------+--------------+------------+------------+------------+------------+------------+------------+------------+-------------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+--------------------+------------+------------+------------+------------+------------+

## Building The Model

### Latent Factor Model

In this step, we build the latent factor model, in order to predict P and Q matrices. We will use the alternating least squares to optimize the learning of the model.

#### Ids To Numeric
Convert string item and user ids to numeric for the ALS model to accept.

In [39]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

def string_to_Numeric(df):
    userIndexer = StringIndexer(inputCol="fullVisitorId", outputCol="userId").setHandleInvalid("skip")
    itemIndexer = StringIndexer(inputCol="productSKU", outputCol="itemId").setHandleInvalid("skip")

    # Pipeline to apply the transformations
    pipeline = Pipeline(stages=[userIndexer, itemIndexer])

    # Fit and transform
    transformed_ratings_dataFrame = pipeline.fit(df).transform(df)

    transformed_ratings_dataFrame.orderBy(transformed_ratings_dataFrame.fullVisitorId).show()
    
    return transformed_ratings_dataFrame


ratings_dataframe = string_to_Numeric(normalized_df)


+-------------------+--------------+----------------+--------------------+------+------+
|      fullVisitorId|    productSKU|session_duration| normalized_duration|userId|itemId|
+-------------------+--------------+----------------+--------------------+------+------+
|0049931492016965831|GGOEGEVA022399|            9821|-0.09982561767578439| 165.0|   8.0|
|0052381813974609729|GGOEAOCB077499|           14292| -0.0562540035285037|  49.0|  20.0|
|0052381813974609729|GGOEGOCB017499|            6931|-0.12798977891166924|  49.0|  21.0|
|0052381813974609729|GGOEGOCC077299|            4745| -0.1492931894520306|  49.0|  22.0|
| 008016723867009901|GGOEGESB015099|            1488| -0.1810339068033375|  12.0|  42.0|
| 008016723867009901|GGOEGESC014099|               0|-0.19553503895523947|  12.0|  74.0|
| 008016723867009901|GGOEGDHC074099|            1394|-0.18194997294734208|  12.0|  13.0|
| 008016723867009901|GGOEGBJL013999|            1419| -0.1817063383345749|  12.0|   9.0|
| 008016723867009901|

#### Basic ALS Recommender System

In [47]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS



def basic_als_recommender(ratings_dataframe, seed):
    '''
    This function must print the RMSE of recommendations obtained
    through ALS collaborative filtering.
    
    The following parameters must be used in the ALS
    optimizer:
    - maxIter: 5
    - rank: 70
    - regParam: 0.01
    - coldStartStrategy: 'drop'
    '''

    (trainingSet, testSet) = ratings_dataframe.randomSplit([0.8, 0.2], seed)

    #Build the recommendation model
    als = ALS(maxIter=5, rank=70, regParam=0.01, coldStartStrategy="drop", userCol="userId", itemCol="itemId", ratingCol="normalized_duration", seed=seed)
    model = als.fit(trainingSet)

    #Evaluate the model
    predictions = model.transform(testSet)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="normalized_duration", predictionCol="prediction")

    rmse = evaluator.evaluate(predictions)

    return rmse


basic_als_recommender(ratings_dataframe, 123)

1.1818651178071158

#### ALS With Bias Recommender

In [40]:
from pyspark.sql.functions import mean

def global_average(ratings_df, seed):
    '''
    This function must print the global average rating for all users and
    all movies in the training set. Training and test
    sets should be determined as before (e.g: as in function basic_als_recommender).
    Test file: tests/test_global_average.py
    '''
    splits = ratings_df.randomSplit([0.8, 0.2], seed)
    training_set = splits[0]

    #now just return the average of the ratings in the training_set
    rating_average = training_set.agg({"normalized_duration": "avg"}).collect()[0][0]

    return rating_average



def als_with_bias_recommender(ratings_df, seed):
    '''
    This function must return the RMSE of recommendations obtained 
    using ALS + biases. Your ALS model should make predictions for *i*, 
    the user-item interaction, then you should recompute the predicted 
    rating with the formula *i+user_mean+item_mean-m* (*m* is the 
    global rating). The RMSE should compare the original rating column 
    and the predicted rating column.  Training and test sets should be 
    determined as before. Your ALS model should use the same parameters 
    as before and be initialized with the random seed passed as 
    parameter. Test file: tests/test_als_with_bias_recommender.py
    '''

    splits = ratings_df.randomSplit([0.8, 0.2], seed)

    training_set = splits[0]
    test_set = splits[1]

    user_mean = training_set.groupby("userId").agg(mean("normalized_duration").alias("user_mean"))
    item_mean = training_set.groupby("itemId").agg(mean("normalized_duration").alias("item_mean"))

    training_set_with_means = training_set.join(user_mean, "userId").join(item_mean, "itemId")
    test_set_with_means = test_set.join(user_mean, "userId").join(item_mean, "itemId")

    global_mean = global_average(ratings_df, seed)

    final_training_set = training_set_with_means.withColumn("user_item_interaction", training_set_with_means.normalized_duration - (
                training_set_with_means.user_mean + training_set_with_means.item_mean - global_mean))


    #Build the recommendation model
    als = ALS(maxIter=5, rank=70, regParam=0.01, coldStartStrategy="drop", userCol="userId", itemCol="itemId", ratingCol="user_item_interaction", seed=seed)
    model = als.fit(final_training_set)

    # Evaluate the model by computing the RMSE on the test data
    predictions = model.transform(test_set_with_means)
    predictions = predictions.withColumn("prediction_eval", predictions['prediction']+predictions['user_mean']+predictions['item_mean']-global_mean)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="normalized_duration", predictionCol="prediction_eval")

    rmse = evaluator.evaluate(predictions)

    return rmse

als_with_bias_recommender(ratings_dataframe, 123)

1.2834586861809087

#### Intermediate Results Discussion

the ALS RMSE = 1.1818651178071158
the ALS+biases RMSE = 1.2834586861809087