<center><img src='https://www.datahack.es/wp-content/uploads/2020/02/logo-home.png' width="500"></center>

# Ejercicio de algoritmo de filtro colaborativo 3 PUNTOS <font color="Grey"> Luis Blanco


El objetivo de este ejercicio es generar un algoritmo de filtro colaborativo con los datos del dataset beer_review. La información se corresponde con la reviews de la web https://www.beeradvocate.com/

<center><img src='https://www.pngitem.com/pimgs/m/39-394080_beer-advocate-hd-png-download.png' width=600 ></center>

## Información

Este dataset consiste en revisiones de cerveza de Beeradvocate. Los datos abarcan un período de más de 10 años, que incluye todas ~ 1.5 millones de revisiones hasta noviembre de 2011. Cada revisión incluye calificaciones en términos de cinco "aspectos": apariencia, aroma, paladar, sabor e impresión general. Las revisiones incluyen información del producto y del usuario, seguida de cada una de estas cinco clasificaciones, y una revisión en formato texto. 

Para nuestro ejercicio vamos a usar una versión reducida de este dataset, que contiene las reviews de aquellos usuarios que han valorado al menos 50 cervezas.

## Pasos a seguir

1. Descarga la información de beeradvocate en una carpeta que se llame `datos_ejercicios/beeradvocate`.

2. Lee el fichero `beer_reviews_50.csv` y crea dos datasets:
    - uno con las reviews de los usuarios.
    - otro con la información de la cerveza (brewery, beer_name, beerid...)
    

3. Aplica el algoritmo ALS, fijando manualmente los parámetros.

4. Realiza una tunning de los hiperparámetros y mide las mejoras, en caso de hacerlas. 

5. Crea una función que dado un usuario nos devuelva el top 10 de recomendaciones. Se valora la creatividad.

6. Descarga el ipynb, conviertelo a html y esos serán tus documentos de evaluación.

# <font color="grey">0. Instalaciones y librerías:

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


<font color="grey"> Instalación de Spark, Findspark y Pyspark.

In [0]:
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark



<font color="Grey">Creación de la Sesión Spark "ejercicio_filtro_colaborativo".

In [0]:
# Set up required environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ejercicio_filtro_colaborativo").master("local[*]").getOrCreate()

<font color="Grey">Instalación y carga de librerías:

In [0]:
#Carga de datos de la web, no lo usamos en esta práctica (ejemplo):
#!pip install wget 
#import wget

#Tratamiento de datos:
import pandas as pd
import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
from pyspark.ml.feature import StringIndexer, IndexToString

#Dibujo de tablas y gráficos:
pd.set_option('display.max_rows', 50) #max 50 rows out

#Modelo de Filtro Colaborativo:
from pyspark.ml.recommendation import ALS

#Evaluadores y métricas:
from pyspark.ml.evaluation import RegressionEvaluator

#Mejora de Entrenamiento y Modelos:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#Variable con el valor de la semilla cuando sea necesaria:
random_state=23 

# <font color="grey">1. Descarga la información de beeradvocate en una carpeta que se llame `datos_ejercicios/beeradvocate`.

<font color="grey">Nosotros ya tenemos los datos de beer_reviews_50.csv descargados y subidos a drive para poder usarlo en Google Colab.  
Un ejemplo de descarga sería el siguiente:

In [0]:
#url= "path"
#wget.download(url, "/content/drive/My Drive/Colab Notebooks/Práctica Spark/datos_ejercicios/beeradvocate")

# <font color="grey">2. Lee el fichero beer_reviews_50.csv y crea dos datasets:

- <font color="grey">uno con las reviews de los usuarios.</font>

- <font color="grey">otro con la información de la cerveza (brewery, beer_name, beerid...)

In [0]:
df = spark.read.csv("/content/drive/My Drive/Colab Notebooks/Práctica Spark/datos_ejercicios/beeradvocate/beer_reviews_50.csv",
                    header=True, inferSchema = True).cache()

In [0]:
df.toPandas()

Unnamed: 0,brewery_id,brewery_name,review_time,review_overall,review_aroma,review_appearance,review_profilename,beer_style,review_palate,review_taste,beer_name,beer_abv,beer_beerid
0,10325,Vecchio Birraio,1234817823,1.5,2.0,2.5,stcules,Hefeweizen,1.5,1.5,Sausa Weizen,5.0,47986.0
1,10325,Vecchio Birraio,1235915097,3.0,2.5,3.0,stcules,English Strong Ale,3.0,3.0,Red Moon,6.2,48213.0
2,10325,Vecchio Birraio,1235916604,3.0,2.5,3.0,stcules,Foreign / Export Stout,3.0,3.0,Black Horse Black Beer,6.5,48215.0
3,10325,Vecchio Birraio,1234725145,3.0,3.0,3.5,stcules,German Pilsener,2.5,3.0,Sausa Pils,5.0,47969.0
4,1075,Caldera Brewing Company,1293735206,4.0,4.5,4.0,johnmichaelsen,American Double / Imperial IPA,4.0,4.5,Cauldron DIPA,7.7,64883.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
256591,870,Moylan's Brewery,1231487618,4.5,4.0,3.5,Jmoore50,American IPA,4.0,4.5,India Pale Ale,6.5,3064.0
256592,870,Moylan's Brewery,1231184213,4.0,3.5,4.0,kmeves,American IPA,3.0,3.5,India Pale Ale,6.5,3064.0
256593,870,Moylan's Brewery,1230947306,4.0,4.5,4.5,tavernjef,American IPA,4.5,4.0,India Pale Ale,6.5,3064.0
256594,870,Moylan's Brewery,1230365245,3.5,4.0,3.5,biboergosum,American IPA,4.0,4.0,India Pale Ale,6.5,3064.0


In [0]:
df.printSchema()

root
 |-- brewery_id: integer (nullable = true)
 |-- brewery_name: string (nullable = true)
 |-- review_time: integer (nullable = true)
 |-- review_overall: double (nullable = true)
 |-- review_aroma: double (nullable = true)
 |-- review_appearance: double (nullable = true)
 |-- review_profilename: string (nullable = true)
 |-- beer_style: string (nullable = true)
 |-- review_palate: double (nullable = true)
 |-- review_taste: double (nullable = true)
 |-- beer_name: string (nullable = true)
 |-- beer_abv: double (nullable = true)
 |-- beer_beerid: double (nullable = true)



<font color="Grey">Se ha inferido correctamente el esquema.

In [0]:
#Trasponemos los datos para ver más claramente los datos de cada columna (ahora filas)
pd.DataFrame(df.take(5),columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
brewery_id,10325,10325,10325,10325,1075
brewery_name,Vecchio Birraio,Vecchio Birraio,Vecchio Birraio,Vecchio Birraio,Caldera Brewing Company
review_time,1234817823,1235915097,1235916604,1234725145,1293735206
review_overall,1.5,3,3,3,4
review_aroma,2,2.5,2.5,3,4.5
review_appearance,2.5,3,3,3.5,4
review_profilename,stcules,stcules,stcules,stcules,johnmichaelsen
beer_style,Hefeweizen,English Strong Ale,Foreign / Export Stout,German Pilsener,American Double / Imperial IPA
review_palate,1.5,3,3,2.5,4
review_taste,1.5,3,3,3,4.5


<font color="Grey"> **VARIABLES**  
<font color="Grey">- brewery_id: identificador de la cervecera.  
<font color="Grey">- brewery_name: nombre de la cervecera.  
<font color="Grey">- review_time: hora a la que se hizo la review.   
<font color="Grey">- review_overall: calificación impresión general.  
<font color="Grey">- review_aroma: calificación olor.  
<font color="Grey">- review_appearance: calificación apariencia.  
<font color="Grey">- review_profilename: nick usuario que ha hecho la reseña.  
<font color="Grey">- beer_style: estilo de cerveza.
<font color="Grey">- review_palate: calificación al paladar.
<font color="Grey">- review_taste: calificación sabor.  
<font color="Grey">- beer_name: nombre de la cerveza.  
<font color="Grey">- beer_abv: volumen de alcohol de la cerveza.  
<font color="Grey">- beer_beerid: identificador de la cereza.

<font color="Grey">Vamos a analizar el dataframe, en busca de duplicados y nulos. Convertiremos el dataframe a Pandas:

In [0]:
df_pandas=df.toPandas()

In [0]:
df_pandas.duplicated().sum()

0

In [0]:
df.toPandas().isnull().sum()

brewery_id                0
brewery_name              0
review_time               0
review_overall            0
review_aroma              0
review_appearance         0
review_profilename        0
beer_style                0
review_palate             0
review_taste              0
beer_name                 0
beer_abv              10594
beer_beerid               0
dtype: int64

<font color="Grey">Nos encontramos que no existen duplicados, sin embargo, tenemos nulos en "beer_abv", variable que representa el grado de alcohol de la cerveza. Vamos a analizar los resultados por si podemos averiguar por qué:

In [0]:
#row con nulos
nan_rows = df_pandas[df_pandas.isnull().any(1)]
nan_rows

Unnamed: 0,brewery_id,brewery_name,review_time,review_overall,review_aroma,review_appearance,review_profilename,beer_style,review_palate,review_taste,beer_name,beer_abv,beer_beerid
222,1075,Caldera Brewing Company,1103668195,3.0,3.0,3.0,RedDiamond,American Stout,4.0,3.0,Cauldron Espresso Stout,,21241.0
452,850,Moon River Brewing Company,1100038819,4.0,3.5,4.0,aracauna,Scotch Ale / Wee Heavy,3.5,3.5,The Highland Stagger,,20689.0
537,1075,Caldera Brewing Company,1260673921,4.0,4.0,4.0,plaid75,American IPA,4.0,4.0,Alpha Beta,,54723.0
687,2724,Pacific Coast Brewing Company,1293559076,1.0,1.5,3.0,womencantsail,American Strong Ale,2.5,1.5,Megalodon,,64803.0
700,2724,Pacific Coast Brewing Company,1205614154,1.5,2.0,1.5,JDV,Belgian Strong Pale Ale,1.0,3.0,Holiday Trappist Style Ale,,41584.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
256255,870,Moylan's Brewery,1308147368,4.5,4.5,4.0,johnmichaelsen,American Barleywine,4.0,4.5,Old Blarney Barleywine - Heaven Hills Barrel Aged,,69050.0
256256,870,Moylan's Brewery,1305392761,4.5,4.5,4.0,kaseydad,American Barleywine,5.0,5.0,Old Blarney Barleywine - Heaven Hills Barrel Aged,,69050.0
256272,870,Moylan's Brewery,1223743206,4.0,3.5,4.5,MisterClean,American Double / Imperial IPA,3.5,3.5,Uberhoppy IPA,,45313.0
256508,22512,Trapp Family Lodge Brewery,1306798968,4.5,3.5,4.0,rtepiak,Maibock / Helles Bock,4.0,4.0,Trapp Maibock,,69483.0


In [0]:
df.toPandas()["beer_abv"].describe()

count    246002.000000
mean          7.135336
std           2.274293
min           0.010000
25%           5.400000
50%           6.600000
75%           9.000000
max          43.000000
Name: beer_abv, dtype: float64

<font color="Grey">Hemos buscado en la web https://www.beeradvocate.com/ las cervezas que aparecen entre los nulos:

- <font color="Grey">https://www.beeradvocate.com/beer/profile/870/69050/ La cerveza está retirada.</font>

- <font color="Grey">https://www.beeradvocate.com/beer/profile/22564/302781/ La cerveza se vende en rotación, es decir, no está disponible siempre.

<font color="Grey">Hemos comprobado más cervezas entre los nulos y en todas coincide que se han retirado, son estacionales o están en rotación, por lo que, para no generar recomendaciones de productos que ya no se comercializan o que no esten disponibles en ese momento, vamos a proceder a eliminarlos de nuestro dataset.

In [0]:
df_pandas.dropna(inplace=True)

In [0]:
df_pandas

Unnamed: 0,brewery_id,brewery_name,review_time,review_overall,review_aroma,review_appearance,review_profilename,beer_style,review_palate,review_taste,beer_name,beer_abv,beer_beerid
0,10325,Vecchio Birraio,1234817823,1.5,2.0,2.5,stcules,Hefeweizen,1.5,1.5,Sausa Weizen,5.0,47986.0
1,10325,Vecchio Birraio,1235915097,3.0,2.5,3.0,stcules,English Strong Ale,3.0,3.0,Red Moon,6.2,48213.0
2,10325,Vecchio Birraio,1235916604,3.0,2.5,3.0,stcules,Foreign / Export Stout,3.0,3.0,Black Horse Black Beer,6.5,48215.0
3,10325,Vecchio Birraio,1234725145,3.0,3.0,3.5,stcules,German Pilsener,2.5,3.0,Sausa Pils,5.0,47969.0
4,1075,Caldera Brewing Company,1293735206,4.0,4.5,4.0,johnmichaelsen,American Double / Imperial IPA,4.0,4.5,Cauldron DIPA,7.7,64883.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
256591,870,Moylan's Brewery,1231487618,4.5,4.0,3.5,Jmoore50,American IPA,4.0,4.5,India Pale Ale,6.5,3064.0
256592,870,Moylan's Brewery,1231184213,4.0,3.5,4.0,kmeves,American IPA,3.0,3.5,India Pale Ale,6.5,3064.0
256593,870,Moylan's Brewery,1230947306,4.0,4.5,4.5,tavernjef,American IPA,4.5,4.0,India Pale Ale,6.5,3064.0
256594,870,Moylan's Brewery,1230365245,3.5,4.0,3.5,biboergosum,American IPA,4.0,4.0,India Pale Ale,6.5,3064.0


In [0]:
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 246002 entries, 0 to 256595
Data columns (total 13 columns):
 #   Column              Non-Null Count   Dtype  
---  ------              --------------   -----  
 0   brewery_id          246002 non-null  int32  
 1   brewery_name        246002 non-null  object 
 2   review_time         246002 non-null  int32  
 3   review_overall      246002 non-null  float64
 4   review_aroma        246002 non-null  float64
 5   review_appearance   246002 non-null  float64
 6   review_profilename  246002 non-null  object 
 7   beer_style          246002 non-null  object 
 8   review_palate       246002 non-null  float64
 9   review_taste        246002 non-null  float64
 10  beer_name           246002 non-null  object 
 11  beer_abv            246002 non-null  float64
 12  beer_beerid         246002 non-null  float64
dtypes: float64(7), int32(2), object(4)
memory usage: 24.4+ MB


In [0]:
df_clean=spark.createDataFrame(df_pandas).cache()

<font color="Grey">Creamos el dataframe reviews, que contiene las reviews de los usuarios:

In [0]:
columnas=df_clean.columns

In [0]:
list_reviews=[i for i in columnas if re.match(r"review",i)] #creamos una lista con las columnas que contienen "review" mediante regex.
list_reviews+=["beer_beerid"] #incluimos el id de la cerveza

In [0]:
reviews=df_clean.select(list_reviews).cache()
reviews.show()

+-----------+--------------+------------+-----------------+------------------+-------------+------------+-----------+
|review_time|review_overall|review_aroma|review_appearance|review_profilename|review_palate|review_taste|beer_beerid|
+-----------+--------------+------------+-----------------+------------------+-------------+------------+-----------+
| 1234817823|           1.5|         2.0|              2.5|           stcules|          1.5|         1.5|    47986.0|
| 1235915097|           3.0|         2.5|              3.0|           stcules|          3.0|         3.0|    48213.0|
| 1235916604|           3.0|         2.5|              3.0|           stcules|          3.0|         3.0|    48215.0|
| 1234725145|           3.0|         3.0|              3.5|           stcules|          2.5|         3.0|    47969.0|
| 1293735206|           4.0|         4.5|              4.0|    johnmichaelsen|          4.0|         4.5|    64883.0|
| 1325524659|           3.0|         3.5|              3

<font color="Grey">Creamos el dataframe beer_info, que contiene toda la información referente a la cerveza y los identificadores.

In [0]:
#Creamos una lista con las columnas que contienen "review" mediante regex, que eliminaremos
list_drop=[i for i in columnas if re.match(r"review",i)]
list_drop

['review_time',
 'review_overall',
 'review_aroma',
 'review_appearance',
 'review_profilename',
 'review_palate',
 'review_taste']

In [0]:
beer_info=df_clean
for i in list_drop:
  beer_info=beer_info.drop(i)

In [0]:
beer_info.toPandas()

Unnamed: 0,brewery_id,brewery_name,beer_style,beer_name,beer_abv,beer_beerid
0,10325,Vecchio Birraio,Hefeweizen,Sausa Weizen,5.0,47986.0
1,10325,Vecchio Birraio,English Strong Ale,Red Moon,6.2,48213.0
2,10325,Vecchio Birraio,Foreign / Export Stout,Black Horse Black Beer,6.5,48215.0
3,10325,Vecchio Birraio,German Pilsener,Sausa Pils,5.0,47969.0
4,1075,Caldera Brewing Company,American Double / Imperial IPA,Cauldron DIPA,7.7,64883.0
...,...,...,...,...,...,...
245997,870,Moylan's Brewery,American IPA,India Pale Ale,6.5,3064.0
245998,870,Moylan's Brewery,American IPA,India Pale Ale,6.5,3064.0
245999,870,Moylan's Brewery,American IPA,India Pale Ale,6.5,3064.0
246000,870,Moylan's Brewery,American IPA,India Pale Ale,6.5,3064.0


<font color="Grey">Vamos a eliminar los duplicados, así tendremos sólo una fila de información por cerveza.

In [0]:
beer_info=beer_info.distinct()

In [0]:
beer_info.orderBy("beer_beerid").toPandas()

Unnamed: 0,brewery_id,brewery_name,beer_style,beer_name,beer_abv,beer_beerid
0,5,Yakima Brewing Co. / Bert Grant's Ales,Scottish Ale,Bert Grant's Scottish Ale,4.7,11.0
1,5,Yakima Brewing Co. / Bert Grant's Ales,Russian Imperial Stout,Bert Grant's Imperial Stout,6.0,12.0
2,5,Yakima Brewing Co. / Bert Grant's Ales,English India Pale Ale (IPA),Bert Grant's IPA,4.2,13.0
3,22,Unibroue,Belgian Strong Dark Ale,Trois Pistoles,9.0,30.0
4,22,Unibroue,Witbier,Blanche De Chambly,5.0,31.0
...,...,...,...,...,...,...
10262,19145,Brasserie Larché,American Amber / Red Ale,Ambrée De Bourgogne,6.5,77302.0
10263,24411,The Bier Brewery,English Stout,Fuggit Stout,5.8,77303.0
10264,26936,Fountain Square Brewing Co.,American IPA,Batch 13,6.6,77305.0
10265,26936,Fountain Square Brewing Co.,Oatmeal Stout,White Out,5.5,77307.0


In [0]:
#Incluimos el dataframe en cache
beer_info.cache()

DataFrame[brewery_id: bigint, brewery_name: string, beer_style: string, beer_name: string, beer_abv: double, beer_beerid: double]

# <font color="Grey">3. Aplica el algoritmo ALS, fijando manualmente los parámetros.

<font color="Grey">En primer lugar, vamos a realizar un conteo para ver el tamaño de nuestra matriz:

In [0]:
conteos = (
    reviews
    .select(
        F.count("*").alias("count"),
        F.countDistinct('review_profilename').alias('userId'),
        F.countDistinct('beer_beerid').alias('beer_beerid')
    )
).first()

In [0]:
conteos

Row(count=246002, userId=1831, beer_beerid=10267)

<font color="Grey">Esto significa que nuestra matriz es:

* $n= 1831$
* $p= 10267$

Así que la matriz $M$ tiene un tamaño de $1831\cdot10267=18798877$ .

In [0]:
reviews.toPandas()

Unnamed: 0,review_time,review_overall,review_aroma,review_appearance,review_profilename,review_palate,review_taste,beer_beerid
0,1234817823,1.5,2.0,2.5,stcules,1.5,1.5,47986.0
1,1235915097,3.0,2.5,3.0,stcules,3.0,3.0,48213.0
2,1235916604,3.0,2.5,3.0,stcules,3.0,3.0,48215.0
3,1234725145,3.0,3.0,3.5,stcules,2.5,3.0,47969.0
4,1293735206,4.0,4.5,4.0,johnmichaelsen,4.0,4.5,64883.0
...,...,...,...,...,...,...,...,...
245997,1231487618,4.5,4.0,3.5,Jmoore50,4.0,4.5,3064.0
245998,1231184213,4.0,3.5,4.0,kmeves,3.0,3.5,3064.0
245999,1230947306,4.0,4.5,4.5,tavernjef,4.5,4.0,3064.0
246000,1230365245,3.5,4.0,3.5,biboergosum,4.0,4.0,3064.0


<font color="Grey">El modelo ALS necesita que le incluyamos una única columna de rating, por lo que vamos a crear una función que nos permita generar una review global para cada cerveza ("rating"), utilizando los pesos según nos marca la web https://www.beeradvocate.com/community/threads/how-to-review-a-beer.241156/

- <font color="Grey">Appearance (Look) = 6%</font>
- <font color="Grey">Smell (Aroma) = 24%</font>
- <font color="Grey">Taste = 40%</font>
- <font color="Grey">Palate (Mouthfeel) = 10%</font>
- <font color="Grey">Overall = 20%

In [0]:
def rating (a,s,t,p,o):
  """
  Función para calcular el rating global
  de las cervezas, dadas sus 5 características:
    - a= Appearance (Look) = 6%
    - s= Smell (Aroma) = 24%
    - t= Taste = 40%
    - p= Palate (Mouthfeel) = 10%
    - o= Overall = 20%
  """
  return round((a*0.06)+(s*0.24)+(t*0.4)+(p*0.1)+(o*0.2),1)


In [0]:
rating_udf=F.udf(rating, T.DoubleType())

In [0]:
reviews=reviews.withColumn("rating", rating_udf("review_appearance","review_aroma",
                                           "review_taste","review_palate",
                                           "review_overall"))

In [0]:
reviews.show()

+-----------+--------------+------------+-----------------+------------------+-------------+------------+-----------+------+
|review_time|review_overall|review_aroma|review_appearance|review_profilename|review_palate|review_taste|beer_beerid|rating|
+-----------+--------------+------------+-----------------+------------------+-------------+------------+-----------+------+
| 1234817823|           1.5|         2.0|              2.5|           stcules|          1.5|         1.5|    47986.0|   1.7|
| 1235915097|           3.0|         2.5|              3.0|           stcules|          3.0|         3.0|    48213.0|   2.9|
| 1235916604|           3.0|         2.5|              3.0|           stcules|          3.0|         3.0|    48215.0|   2.9|
| 1234725145|           3.0|         3.0|              3.5|           stcules|          2.5|         3.0|    47969.0|   3.0|
| 1293735206|           4.0|         4.5|              4.0|    johnmichaelsen|          4.0|         4.5|    64883.0|   4.3|


<font color="Grey">El modelo ALS también necesita que los datos que se introduzcan sean numéricos. En nuestro dataframe, el id del usuario es su nick, es decir, es de tipo string, por lo que vamos a pasarlo a numérico mediante StringIndexer:

In [0]:
indexer= StringIndexer(inputCol="review_profilename", outputCol="user_id")
reviews = indexer.fit(reviews).transform(reviews)
reviews.show()

+-----------+--------------+------------+-----------------+------------------+-------------+------------+-----------+------+-------+
|review_time|review_overall|review_aroma|review_appearance|review_profilename|review_palate|review_taste|beer_beerid|rating|user_id|
+-----------+--------------+------------+-----------------+------------------+-------------+------------+-----------+------+-------+
| 1234817823|           1.5|         2.0|              2.5|           stcules|          1.5|         1.5|    47986.0|   1.7|  102.0|
| 1235915097|           3.0|         2.5|              3.0|           stcules|          3.0|         3.0|    48213.0|   2.9|  102.0|
| 1235916604|           3.0|         2.5|              3.0|           stcules|          3.0|         3.0|    48215.0|   2.9|  102.0|
| 1234725145|           3.0|         3.0|              3.5|           stcules|          2.5|         3.0|    47969.0|   3.0|  102.0|
| 1293735206|           4.0|         4.5|              4.0|    johnmi

In [0]:
reviews.printSchema()

root
 |-- review_time: long (nullable = true)
 |-- review_overall: double (nullable = true)
 |-- review_aroma: double (nullable = true)
 |-- review_appearance: double (nullable = true)
 |-- review_profilename: string (nullable = true)
 |-- review_palate: double (nullable = true)
 |-- review_taste: double (nullable = true)
 |-- beer_beerid: double (nullable = true)
 |-- rating: double (nullable = true)
 |-- user_id: double (nullable = false)



<font color="Grey">Vamos a usar los hiperparámetros por defecto, excepto "coldStartStrategy", que por defecto Spark asigna NaN cuando un usuario y / o factor de elemento no está presente en el modelo. Esto puede ser útil en un sistema de producción, ya que indica un nuevo usuario o elemento, por lo que el sistema puede tomar una decisión sobre alguna alternativa para usar como predicción.  
Sin embargo, esto no es deseable durante la validación cruzada, ya que cualquier NaN predicho dará como resultado NaN resultados para la métrica de evaluación (por ejemplo, cuando se usa RegressionEvaluator). Esto hace que la selección del modelo sea imposible, por lo que usaremos "drop", con el fin de eliminar cualquier fila en las predicciones que contienen valores NaN.

In [0]:
als = ALS(
    userCol="user_id", 
    itemCol="beer_beerid", 
    ratingCol=("rating"),
    coldStartStrategy="drop",
    seed=random_state
)

In [0]:
modelo = als.fit(reviews)

In [0]:
modelo

ALS_d272e362fc1f

In [0]:
predictions = modelo.transform(reviews)

In [0]:
predictions.show()

+-----------+--------------+------------+-----------------+------------------+-------------+------------+-----------+------+-------+----------+
|review_time|review_overall|review_aroma|review_appearance|review_profilename|review_palate|review_taste|beer_beerid|rating|user_id|prediction|
+-----------+--------------+------------+-----------------+------------------+-------------+------------+-----------+------+-------+----------+
| 1001337433|           3.0|         3.0|              3.0|              John|          3.0|         3.0|      833.0|   3.0|  471.0| 3.5172522|
| 1246901622|           4.0|         3.0|              3.0|  superdedooperboy|          3.0|         3.5|      833.0|   3.4|  496.0| 3.4462495|
| 1038108814|           4.5|         4.0|              3.5|             hyuga|          3.5|         3.5|      833.0|   3.8| 1591.0| 3.6479034|
| 1044768828|           3.5|         3.5|              3.0|          deckjohn|          4.0|         3.5|      833.0|   3.5| 1829.0| 3.1

In [0]:
evaluator = RegressionEvaluator(
    metricName="rmse", 
    labelCol="rating",
    predictionCol="prediction"
)

In [0]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.412278511639312


# <font color="Grey">4. Realiza una tunning de los hiperparámetros y mide las mejoras, en caso de hacerlas.

<font color="Grey">Para realizar el tunning de hiperparámetros y decidir cuáles son los mejores, primero divideramos el dataset en train y test, 75% y 25% respectivamente.

In [0]:
(train, test) = reviews.randomSplit([0.75, 0.25],seed=random_state)

In [0]:
print("Los registros en train son:", train.count())
print("Los registros en test son:", test.count())
print("Comprobamos que la suma de train y test sea igual que el de reviews:",train.count()+test.count()==reviews.count())

Los registros en train son: 184609
Los registros en test son: 61393
Comprobamos que la suma de train y test sea igual que el de reviews: True


<font color="Grey">Modelo estandar al que añadiremos grid de hiperparámetros:

In [0]:
als = ALS(
    userCol="user_id", 
    itemCol="beer_beerid", 
    ratingCol=("rating"),
    coldStartStrategy="drop",
    seed=random_state,
)

<font color="Grey">Selección de hiperparámetros:

In [0]:
paramGrid = (
    ParamGridBuilder()
    .addGrid(als.rank, [10, 100])
    .addGrid(als.regParam, [0.01, 0.1, 0.05])
    .addGrid(als.maxIter, [5, 10])
    .addGrid(als.numUserBlocks, [1, 30])
    .build()
)

<font color="Grey">Creación de Validación Cruzada (nºfolds 2):

In [0]:
crossval = CrossValidator(
    estimator=als,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=2,
    seed=random_state
) 

In [0]:
#20 minutos de ejecución aprox.
modelo_cv = crossval.fit(train)

<font color="Grey">Suma del RMSE de los folds:

In [0]:
modelo_cv.avgMetrics

[0.5541425922055149,
 0.5514018634171872,
 0.5640152792046816,
 0.5621802671175605,
 0.448341986578751,
 0.4506836532746823,
 0.44682558715623766,
 0.44813711893378605,
 0.45618144600291843,
 0.4551262379749046,
 0.4579157454650933,
 0.4578042303848058,
 0.9421813673086284,
 0.9208121454503528,
 0.8482836355323307,
 0.8262057493476811,
 0.46295868923149863,
 0.46398321849466684,
 0.45314047218708736,
 0.4534901260682064,
 0.46587925252613205,
 0.4640637370506003,
 0.4566801022826252,
 0.45598907191117166]

<font color="grey">Vamos a realizar un dataframe, con los diferentes hiperparámetros y su métrica (media):

In [0]:
mallado = pd.DataFrame(paramGrid)

In [0]:
mallado

Unnamed: 0,ALS_07e02b7ead7e__rank,ALS_07e02b7ead7e__regParam,ALS_07e02b7ead7e__maxIter,ALS_07e02b7ead7e__numUserBlocks
0,10,0.01,5,1
1,10,0.01,5,30
2,10,0.01,10,1
3,10,0.01,10,30
4,10,0.1,5,1
5,10,0.1,5,30
6,10,0.1,10,1
7,10,0.1,10,30
8,10,0.05,5,1
9,10,0.05,5,30


In [0]:
mallado.columns=[i.name for i in mallado.columns]

In [0]:
mallado['avgMetrics'] = modelo_cv.avgMetrics

In [0]:
mallado.sort_values('avgMetrics', inplace=True)
mallado

Unnamed: 0,rank,regParam,maxIter,numUserBlocks,avgMetrics
6,10,0.1,10,1,0.446826
7,10,0.1,10,30,0.448137
4,10,0.1,5,1,0.448342
5,10,0.1,5,30,0.450684
18,100,0.1,10,1,0.45314
19,100,0.1,10,30,0.45349
9,10,0.05,5,30,0.455126
23,100,0.05,10,30,0.455989
8,10,0.05,5,1,0.456181
22,100,0.05,10,1,0.45668


In [0]:
best_modelo= modelo_cv.bestModel

<font color="Grey">Terminamos entrenando el modelo con los mejores hiperparámetros que hemos conseguido con toda nuestra data:

In [0]:
modelo_definitivo = ALS(
    regParam=mallado.iloc[0]['regParam'],
    maxIter=mallado.iloc[0]['maxIter'],
    rank=mallado.iloc[0]['rank'],
    numUserBlocks=mallado.iloc[0]['numUserBlocks'],  
    userCol="user_id", 
    itemCol="beer_beerid", 
    ratingCol=("rating"),
    coldStartStrategy="drop",
    seed=random_state,
).fit(reviews)

In [0]:
rmse_cv = evaluator.evaluate(modelo_definitivo.transform(reviews))
print("Root-mean-square error CV = " + str(rmse_cv))

Root-mean-square error CV = 0.40892149965416014


In [0]:
print("El RMSE sin tunear hiperparámetros es:",round(rmse,4))
print("El RMSE con el mejor modelo después de tunear los hiperparámetros es:", round(rmse_cv,4))
print("Al tunear hiperparámetros hemos reducido el error (RMSE) en un:",round((rmse_cv/rmse)-1,4),"%")

El RMSE sin tunear hiperparámetros es: 0.4123
El RMSE con el mejor modelo después de tunear los hiperparámetros es: 0.4089
Al tunear hiperparámetros hemos reducido el error (RMSE) en un: -0.0081 %


# <font color="Grey">5. Crea una función que dado un usuario nos devuelva el top 10 de recomendaciones. Se valora la creatividad.

In [0]:
userRecs = modelo_definitivo.recommendForAllUsers(10)

In [0]:
userRecs.printSchema()

root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- beer_beerid: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [0]:
userRecs.show(truncate=False)

+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                                                                                                      |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|148    |[[16312, 4.9125543], [59584, 4.8295097], [70054, 4.7879667], [68665, 4.7127376], [58921, 4.6750283], [38766, 4.661421], [63352, 4.655837], [53717, 4.644256], [28217, 4.625271], [65686, 4.6204743]] |
|463    |[[16312, 5.006316], [59584, 4.8360705], [70054, 4.8358335], [68665, 4.809285], [38766, 4.7681723], [58921, 4.721408], [53717, 4.6982946], [64487, 4.694891], [1

<font color="Grey">Vamos a crear una función que nos ofreza las 10 recomendaciones para un user id:

In [0]:
def top10_recomendaciones (user_id):
    top10=(userRecs
    .filter(""" user_id = %s """ %user_id)
    .withColumn("recommendations",F.explode("recommendations"))
    .withColumn("beer_beerid",F.col('recommendations')['beer_beerid'])
    .withColumn("rating",F.col('recommendations')['rating'])
    .drop("recommendations")
    .join(beer_info, 'beer_beerid')
    .orderBy(F.desc('rating'))
    .select("beer_beerid","user_id","rating","beer_name","brewery_name","beer_style","beer_abv")).toPandas()
    return top10

In [0]:
top10_recomendaciones(148)

Unnamed: 0,beer_beerid,user_id,rating,beer_name,brewery_name,beer_style,beer_abv
0,16312,148,4.912554,Monster Mash,Rocky River Brewing,Fruit / Vegetable Beer,4.4
1,59584,148,4.82951,Double Kilt-Sickle Reserve,Moylan's Brewery,Scotch Ale / Wee Heavy,12.3
2,70054,148,4.787967,Alesmith Speedway Stout - Vanilla And Coconut,AleSmith Brewing Company,American Double / Imperial Stout,12.0
3,68665,148,4.712738,Lips Of Faith - Eric's Ale (Bourbon Barrel Aged),New Belgium Brewing,American Wild Ale,9.0
4,58921,148,4.675028,Bourbon Barrel Barleywine,Great Basin Brewing Co.,American Barleywine,13.0
5,38766,148,4.661421,Oak Aged Los Diablos Del Paso,Cornelius Pass Roadhouse & Imbrie Hall (McMena...,American IPA,8.0
6,63352,148,4.655837,La Vente D'Ange,Brasserie Des Vignes,Gueuze,6.0
7,53717,148,4.644256,Thanks Mr. Noonan IPA,John Harvard's Brewery & Ale House,American IPA,6.1
8,28217,148,4.625271,Darken Boar Wheat Special,Hideji Beer,Dunkelweizen,4.5
9,65686,148,4.620474,Coffee Infused Imperial Stout Trooper,New England Brewing Co.,American Double / Imperial Stout,8.5
