# VIVINO wines recommendation
> ZHAW CAS Machine Intelligence - Big Data Module - Sansar Choinyambuu, Gustavo Martinez

In this notebook a recommendation system based on collaborative filtering is presented with the pyspark.ml library. A graph of users and their top rated wines is build with help of the GraphFrames library

https://www.vivino.com/

## Data scraping
The data was obtained from vivino.com using self-written scrapper available at:  
https://github.com/sansar-choinyambuu/vivino-users/blob/main/scrape_top_ranked_wines.py

For each of the top ranked users obtained by the vivino-users scrapper, the wines in the user top rated wines page is scraped  
"https://www.vivino.com/users/{user_id}/top"

The top wines pages contain a maximum of 10 wines per user.

Attributes extracted:
- wine id
- wine name
- user rating (1 to 5, int)
- price
- rating (average of all user ratings)

The scraping has been done with help of selenium for python, and Mozilla Firefox webdriver.

### Read and prepare data

In [0]:
import logging
import sys

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = spark._jvm.org.apache.log4j
logging.getLogger("py4j").setLevel(logging.ERROR)

In [0]:
import pandas as pd
# data is available at https://github.com/sansar-choinyambuu/vivino-users
# ratings_df = pd.read_pickle("/dbfs/FileStore/shared_uploads/gustavo.martinez@mirai-solutions.com/vivino_ratings.pkl")
ratings_df.wine_id = ratings_df.wine_id.astype("int")
ratings_df.user_id = ratings_df.user_id.astype("int")

ratings = spark.createDataFrame(ratings_df)
display(ratings.head(5))

wine_id,name,user_rating,price,rating,user_id
8366987,Blanco,5.0,0.0,4.1,30610918
6480726,Soleras de Almacenista Manzanilla,5.0,0.0,4.4,30610918
1157942,Riesling Trocken,5.0,22.49,3.8,30610918
2369993,Barolo Riserva San Bernardo,5.0,109.99,4.3,30610918
1100125,Barolo Bricco Delle Viole,5.0,84.99,4.2,30610918


In [0]:
print(f"There are {len(ratings_df)} wines ratings. {len(set(ratings_df.wine_id))} different wines have been rated by {len(set(ratings_df.user_id))} different users")

## Collaborative Filtering Recommender

A recommender system using collaborative filtering is defined and trained with the help of pyspark.ml

The model aims at building a **user-item matrix** (user-wines) that assigns ratings between the two entities. Collaborative filtering techniques aim to fill in the missing entries of this association matrix.

The model uses **explicit feedback** provided by users: the ratings assigned to the top rated wines per user.

The **alternating least squares (ALS)** algorithm is used to learn the latent factor that are used to predict missing entries in the user-item matrix.

A "drop" **cold-start strategy** is used to avoid NaN values in the predictions on test data for evaluation purposes. This drops from the evaluation predictions cases where a user or item is present in the evaluation not in the training dataset, that otherwise produces a NaN value.

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="user_id", itemCol="wine_id", ratingCol="user_rating",
          coldStartStrategy="drop")
model = als.fit(training)


### Model evaluation
Evaluation with **root mean square error (RMSE)**

In [0]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="user_rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

### Generate recommendations

In [0]:
# Generate top 10 wine recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each wine
wineRecs = model.recommendForAllItems(10)

# Generate top 10 wine recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of wines
wines = ratings.select(als.getItemCol()).distinct().limit(3)
wineSubSetRecs = model.recommendForItemSubset(wines, 10)

In [0]:
display(userRecs.head(1))
display(wineRecs.head(1))
display(userSubsetRecs.head(1))
display(wineSubSetRecs.head(1))

user_id,recommendations
460575,"List(List(1415247, 8.06108283996582), List(1397132, 7.526838302612305), List(1301256, 7.405379295349121), List(1302651, 6.9759745597839355), List(1136158, 6.933765411376953), List(2401295, 6.787261962890625), List(1392761, 6.403262138366699), List(1135736, 6.378779411315918), List(2061837, 6.344078540802002), List(1105484, 6.221710681915283))"


wine_id,recommendations
950,"List(List(25618574, 7.956789016723633), List(24896385, 7.647671222686768), List(1936378, 7.612422466278076), List(13026958, 7.198644638061523), List(14960161, 7.032809257507324), List(15171896, 6.494895935058594), List(10913484, 6.2591023445129395), List(9090580, 6.215423583984375), List(7203325, 6.1355180740356445), List(39657748, 6.109884262084961))"


user_id,recommendations
9938486,"List(List(1688, 7.484900951385498), List(13406, 7.455594539642334), List(1084873, 7.276030540466309), List(50644, 6.778293132781982), List(1178663, 6.147983551025391), List(2155298, 6.091994762420654), List(2710137, 5.853062152862549), List(76372, 5.754621982574463), List(24139, 5.735905170440674), List(19919, 5.725275039672852))"


wine_id,recommendations
1157942,"List(List(6016898, 15.953051567077637), List(9790869, 15.464569091796875), List(18732688, 14.003107070922852), List(40142184, 13.226593971252441), List(1056266, 13.093711853027344), List(15593967, 12.89148235321045), List(28249150, 12.823064804077148), List(14566868, 12.805499076843262), List(5338800, 12.783918380737305), List(4313519, 12.110170364379883))"


### Graph of users and rated wines

In [0]:
from graphframes import *
vertices = ratings.selectExpr("user_id as id").distinct().union(ratings.selectExpr("wine_id as id").distinct())
edges = ratings.selectExpr("user_id as src", "wine_id as dst", "user_rating", "rating", "name")
g = GraphFrame(vertices, edges)
print(f"Graph has {g.vertices.count()} vertices and {g.edges.count()} edges")

In [0]:
# Most popular wines
inDeg = g.inDegrees
wines = ratings.selectExpr("cast(wine_id as int) as id", "name").distinct()
display(inDeg.join(wines, inDeg.id == wines.id, "inner").sort("inDegree", ascending=False).limit(10))


id,inDegree,id.1,name
86684,153,86684,Brut Champagne
1153863,91,1153863,Sauternes
1175427,89,1175427,Valbuena 5º
5078,80,5078,Sassicaia
1652,73,1652,Tignanello
7122486,72,7122486,Grande Cuvée Brut Champagne
74304,71,74304,Cristal Brut Champagne (Millésimé)
1139434,56,1139434,Tinto
77137,51,77137,Unico
1684223,49,1684223,Pauillac (Premier Grand Cru Classé)


In [0]:
# Users that liked the same wine
co_rated = g.find("(u1)-[]->(w); (u2)-[]->(w)")
display(co_rated)

u1,w,u2
List(13164555),List(4934908),List(13164555)
List(13164555),List(1228624),List(13164555)
List(13164555),List(2109946),List(13164555)
List(13164555),List(1284235),List(13164555)
List(13164555),List(3204323),List(13164555)
List(13164555),List(17339),List(13164555)
List(13164555),List(4767354),List(13164555)
List(13164555),List(1942603),List(13164555)
List(13164555),List(1203903),List(28382676)
List(13164555),List(1203903),List(13164555)


#### Connected components

In [0]:
sc.setCheckpointDir("/FileStore/shared_uploads/gustavo.martinez@mirai-solutions.com/project/checkpoints")
connected = g.connectedComponents()
connected.select("id", "component").groupBy("component").count().orderBy(F.desc("count")).show(5)