# Modelo ALS: Trabajo Minería Grandes Volumenes de Información
## Integrantes
* Daniel Enrique Pinto Restrepo
* Santiago Mejía Chitiva
* Carlos Andres Cuartas Murillo
* Daniel Roman Ramirez

## Resumen
El objetivo de este trabajo es realizar un sistema de recomendación para el negocio de belleza, con la información de recomendaciones de aws. El desarrollo de este notebook es sencillo, primero cargamos la base de datos, le damos una primera mirada a su estructura y luego nos quedamos con las variables relevantes `customer_id`, `product_id` y `star_rating`. Luego se realiza un cambio en el id de los productos, ya que la libería que utilizamos para usar el algoritmo del ALS requiere que todos los campos sean númericos. El paso siguientes es normalizar los ratings por la media de los productos, seguido de esto separamos los datos en entrenamiento y testeo, entrenamos el modelo y luego lo probamos en los datos de testeo. En este caso obtenemos un RMSE de **1.11**, de acuerdo a esto, realizamos una normalización adicional pero esta vez es por la media de los consumidores. Al realizar esta normalización mejora mucho el modelo y su resultado, ya que el RMSE llega a **0.95**

## Primeros pasos con el Notebook
En esta primera parte del notebook vemos información general de la configuración

In [1]:
%%info

## Carga de los datos y descripción
Cómo se mencionó antes usaremos los datos de amazon aws. La descripción de los datos se realizó en el Notebook de la implementación del modelo base de Knn, los datos se guardaron en el bucket S3 del proyecto en formato parquet, el paso que sigue a continuación es la carga del archivo desde le bucket

In [2]:
df  = spark.read.parquet("s3://proyecto.mineria/revies_beauty.parquet.gzip")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1600115920100_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
#Acá podemos ver un poco de la base de datos
df.show(n=5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    1797882|R3I2DHQBR577SS|B001ANOOOE|       2102612|The Naked Bee Vit...|          Beauty|          5|            0|          0|   N|                Y|          Five Stars|Love this, excell...|2015-08-31 00:00:00|
|         US|   18381298|R1QNE9NQFJC2Y4|B0016J22EQ|     106393691|Alba Botanica Sun...| 

---

---

En los pasos siguientes filtraremos solo las columnas que necesitamos, en nuestro caso son **product_id**, **customer_id** y **star_rating**

In [4]:
df_mod=df.drop("marketplace","product_title","product_category","review_id","product_parent",
               "helpful_votes","total_votes","vine","verified_purchase","review_headline",
               "review_body", "review_date")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
df_mod.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+-----------+
|customer_id|product_id|star_rating|
+-----------+----------+-----------+
|    1797882|B001ANOOOE|          5|
|   18381298|B0016J22EQ|          5|
|   19242472|B00HU6UQAG|          5|
|   19551372|B002HWS7RM|          5|
|   14802407|B00SM99KWU|          5|
+-----------+----------+-----------+
only showing top 5 rows

Ahora realizaremos la preparación para la base de datos con el objetivo de entrenar el modelo. Al momento de cargar la base de datos nos percatamos que la variable `star_rating` se encontraba en string, es necesario entonces pasar dicha variable a integer.

In [6]:
from pyspark.sql.types import StringType, DoubleType, IntegerType
df_mod = df_mod.withColumn("star_rating",df_mod["star_rating"].cast(IntegerType())) # Convertir variable "star_rating" de "string" a "integer"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="product_id", outputCol="product_id_index")
df_mod_indexed = indexer.fit(df_mod).transform(df_mod)
df_mod_indexed = df_mod_indexed.withColumn("product_id_index",df_mod_indexed["product_id_index"].cast(IntegerType()))
df_mod_indexed.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+-----------+----------------+
|customer_id|product_id|star_rating|product_id_index|
+-----------+----------+-----------+----------------+
|    1797882|B001ANOOOE|          5|            6405|
|   18381298|B0016J22EQ|          5|           15168|
|   19242472|B00HU6UQAG|          5|          359617|
|   19551372|B002HWS7RM|          5|             976|
|   14802407|B00SM99KWU|          5|            3665|
|    2909389|B000NYL1Z6|          4|           77601|
|   19397215|B001SYWTFG|          5|            3058|
|    3195210|B005F2EVMQ|          5|           44245|
|   52216383|B00M1SUW7K|          5|            4058|
|   10278216|B001KYQA1S|          1|             899|
|   24655453|B00SAQ9DZY|          4|           33156|
|   30788223|B00HFQQ0VU|          5|              36|
|   11257536|B00PYL8MAA|          4|          179388|
|   29605778|B00D9NV2D4|          5|              23|
|   41238422|B008Y9M412|          5|           20100|
|   23620123|B00FWXBLHG|    

In [8]:
df_mod_indexed=df_mod_indexed.drop("product_id")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
df_mod_indexed.select("product_id_index").dtypes

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('product_id_index', 'int')]

In [10]:
#Ahora normalizaremos los ratings por item para verificar si podemos obtener un mejore RMSE
from pyspark.sql.functions import *
from pyspark.sql.window import Window

def z_score(c, w):
    return (col(c) - mean(c).over(w)) / stddev(c).over(w)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
w = Window.partitionBy("product_id_index")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
from pyspark.sql import functions as F

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
df_norm = (df_mod_indexed.withColumn('mean', F.mean("star_rating").over(w))
           .withColumn("star_rating", ((F.col("star_rating") - F.col('mean'))))
        .drop('mean'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
df_norm.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+----------------+
|customer_id|         star_rating|product_id_index|
+-----------+--------------------+----------------+
|   23109705|  0.7945286946179007|              12|
|     489211|  0.7945286946179007|              12|
|   12641855|  0.7945286946179007|              12|
|   32685817|-0.20547130538209935|              12|
|   17012967| -3.2054713053820993|              12|
+-----------+--------------------+----------------+
only showing top 5 rows

### Normalizando solo por product Id

In [15]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
(training,test)=df_norm.randomSplit([0.8, 0.2])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="customer_id",itemCol="product_id_index",ratingCol="star_rating",coldStartStrategy="drop",nonnegative=True)
model=als.fit(training)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="star_rating",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
predictions.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RMSE=1.1147975693605026
+-----------+--------------------+----------------+----------+
|customer_id|         star_rating|product_id_index|prediction|
+-----------+--------------------+----------------+----------+
|    9711777|  0.7945286946179007|              12|0.69269514|
|   10259710| -2.2054713053820993|              12|       0.0|
|   11069344|  0.7945286946179007|              12|  0.606427|
|   11215854|-0.20547130538209935|              12|0.09630573|
|   12990320|  0.7945286946179007|              12|0.69269514|
|   15842544|  0.7945286946179007|              12|0.59026515|
|   16413469|-0.20547130538209935|              12|       0.0|
|   16857311| -2.2054713053820993|              12|       0.0|
|   17153070|  0.7945286946179007|              12| 0.3626032|
|   17217809|-0.20547130538209935|              12|       0.0|
|   17528743|  0.7945286946179007|              12| 0.6312808|
|   18533425|-0.20547130538209935|              12|0.12021449|
|   18538027|  0.79452869461790

## Normalizando los ratings por usuario

In [19]:
w = Window.partitionBy("customer_id")
df_norm = (df_mod_indexed.withColumn('mean', F.mean("star_rating").over(w))
           .withColumn("star_rating", ((F.col("star_rating") - F.col('mean'))))
        .drop('mean'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
df_norm.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-------------------+----------------+
|customer_id|        star_rating|product_id_index|
+-----------+-------------------+----------------+
|      11090|                0.0|           30930|
|      11124|                0.0|          432669|
|      11177|                0.0|          307683|
|      11257|0.33333333333333304|            1838|
|      11257|0.33333333333333304|            3882|
+-----------+-------------------+----------------+
only showing top 5 rows

In [21]:
(training,test)=df_norm.randomSplit([0.8, 0.2])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="customer_id",itemCol="product_id_index",ratingCol="star_rating",coldStartStrategy="drop",nonnegative=True)
model=als.fit(training)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="star_rating",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
predictions.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RMSE=0.9493695006048416
+-----------+-------------------+----------------+-----------+
|customer_id|        star_rating|product_id_index| prediction|
+-----------+-------------------+----------------+-----------+
|     197339|                0.0|              12|        0.0|
|     523057|                0.0|              12|        0.0|
|    1773521|               -1.5|              12|        0.0|
|    6531285|                0.0|              12|        0.0|
|    6647905|               0.25|              12|        0.0|
|   14082085|                0.0|              12|        0.0|
|   15869883|                1.0|              12|  0.6005992|
|   22134342|                0.5|              12| 0.35039532|
|   25283108|                0.0|              12|        0.0|
|   25702176|               -0.5|              12|0.026170174|
|   26539155|               -1.5|              12|        0.0|
|   26577467|-1.9166666666666665|              12| 0.29019225|
|   27729734|                0.

## Ajuste de hiperparametros

In [24]:
als=ALS(maxIter=10,regParam=0.05,rank=10,userCol="customer_id",itemCol="product_id_index",ratingCol="star_rating",coldStartStrategy="drop",nonnegative=True)
model=als.fit(training)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="star_rating",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
predictions.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RMSE=0.9690639518570164
+-----------+-------------------+----------------+----------+
|customer_id|        star_rating|product_id_index|prediction|
+-----------+-------------------+----------------+----------+
|     197339|                0.0|              12|       0.0|
|     523057|                0.0|              12|       0.0|
|    1773521|               -1.5|              12|       0.0|
|    6531285|                0.0|              12|       0.0|
|    6647905|               0.25|              12| 0.9248162|
|   14082085|                0.0|              12|       0.0|
|   15869883|                1.0|              12| 0.7780253|
|   22134342|                0.5|              12|0.42315927|
|   25283108|                0.0|              12|       0.0|
|   25702176|               -0.5|              12|       0.0|
|   26539155|               -1.5|              12|       0.0|
|   26577467|-1.9166666666666665|              12|0.12991709|
|   27729734|                0.0|             

### Iteración 2

In [26]:
als=ALS(maxIter=5,regParam=0.1,rank=15,userCol="customer_id",itemCol="product_id_index",ratingCol="star_rating",coldStartStrategy="drop",nonnegative=True)
model=als.fit(training)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="star_rating",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
predictions.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RMSE=0.9499733081067021
+-----------+-------------------+----------------+-----------+
|customer_id|        star_rating|product_id_index| prediction|
+-----------+-------------------+----------------+-----------+
|     197339|                0.0|              12|        0.0|
|     523057|                0.0|              12|        0.0|
|    1773521|               -1.5|              12|        0.0|
|    6531285|                0.0|              12|        0.0|
|    6647905|               0.25|              12| 0.11362219|
|   14082085|                0.0|              12|        0.0|
|   15869883|                1.0|              12|  0.5727278|
|   22134342|                0.5|              12| 0.15253676|
|   25283108|                0.0|              12|        0.0|
|   25702176|               -0.5|              12|        0.0|
|   26539155|               -1.5|              12|        0.0|
|   26577467|-1.9166666666666665|              12|0.054613266|
|   27729734|                0.

### Iteración 3

In [28]:
als=ALS(maxIter=5,regParam=0.01,rank=30,userCol="customer_id",itemCol="product_id_index",ratingCol="star_rating",coldStartStrategy="drop",nonnegative=True)
model=als.fit(training)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="star_rating",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
predictions.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RMSE=0.984361406353269
+-----------+------------------+----------------+----------+
|customer_id|       star_rating|product_id_index|prediction|
+-----------+------------------+----------------+----------+
|    4059899|               0.0|              12|       0.0|
|    6165336|               0.0|              12|       0.0|
|    9700175|0.5999999999999996|              12| 0.5670325|
|   10536753|               0.0|              12|       0.0|
|   11243840|               0.0|              12|       0.0|
|   12990320|               0.0|              12|       0.0|
|   16082975|0.2142857142857144|              12| 0.1994639|
|   17217809|               0.0|              12|       0.0|
|   17528743|               0.0|              12|       0.0|
|   17778498|               0.5|              12|0.59400934|
|   18209681|               0.0|              12|       0.0|
|   19861993|               0.0|              12|       0.0|
|   23204599|1.1666666666666665|              12| 1.0342299|
|