**Importación de librerias**

In [91]:
import pandas as pd
from sklearn.preprocessing import MultiLabelBinarizer, OneHotEncoder, StandardScaler
from sklearn.neighbors import NearestNeighbors
import numpy as np
import seaborn as sns

**Importacion de datos**

In [144]:
data = pd.read_csv("Datos\data.csv")
#Los datos cambian de tipo con la exportacion a CSV por lo que tenemos que corregirlo
data.listed_in = data.listed_in.apply(eval)
data.cast = data.cast.apply(eval)

### **Preprocesamiento de features**

No utilizaremos todas las columnas en nuestro modelo por lo que seleccionamos las que creemos necesarias

In [145]:
#Seleccionamos las columnas que nos interesan
data_f = data[['type', "title", 'cast', 'show_id', 'antiquity_category', 'duration_category', 'listed_in', 'Promedio_Valoraciones']]

Para combatir un poco los outliers filtramos los datos, ignoramos todos los actores que hayan participado en menos de 5 peliculas

In [146]:
freq = data_f['cast'].apply(pd.Series).stack().value_counts()

# Seleccionar aquellos actores cuya frecuencia es mayor que 5
filtro = freq[freq > 5].index

# Filtrar el DataFrame para mantener solo las filas que contienen actores que hayan participado en mas de 5 peliculas
data = data_f[data_f['cast'].apply(lambda x: any(item in filtro for item in x))]

# Reordenar los indices
data = data.reset_index()

#### ONE-HOT-ENCODING

MultiLabelBinarizer codifica en un array binario las columnas que contienen listas

In [147]:
# Crear una instancia del codificador one-hot
mlb = MultiLabelBinarizer()
genres_encoded = mlb.fit_transform(data['listed_in'])
actors_encoded = mlb.fit_transform(data['cast'])

OneHotEncoder codifica en un array binario las columnas que contienen listas

In [148]:
oh_encoder = OneHotEncoder(sparse=False)
antiquity_encoded = oh_encoder.fit_transform(data[['antiquity_category']])
duration_encoded = oh_encoder.fit_transform(data[['duration_category']])
id_encoded = oh_encoder.fit_transform(data[['show_id']])

duration_encoded

array([[1., 0.],
       [0., 1.],
       [1., 0.],
       ...,
       [1., 0.],
       [1., 0.],
       [1., 0.]])

Unimos todos nuestros arrays binarios en uno solo para poder entrenar el modelo

In [149]:
#concatenamos todos los array
data_m = np.concatenate((actors_encoded, genres_encoded, antiquity_encoded, duration_encoded,id_encoded), axis=1)

### **Modelo K-vecinos**

Como sabemos, el modelo k-vecinos predice sus datos en base a similtudes entre el input y el conjunto de datos que es justamente lo que buscamos en un sistema de recomendación de peliculas, es por ello que lo escogemos como nuestro modelo ideal.

In [150]:
# Crear un modelo de k-vecinos con 5 vecinos
model = NearestNeighbors(n_neighbors=5, metric='cosine')

# Entrenar el modelo con los datos codificados
model.fit(data_m)

NearestNeighbors(metric='cosine')

Procedemos a ejecutar una recomendacion

In [153]:
movie_index = data.loc[data['show_id'].apply(lambda x: "as7611" in x)].index
distances, indices = model.kneighbors(data_m[movie_index].reshape(1,-1))

MemoryError: Unable to allocate 4.51 GiB for an array with shape (13465, 44999) and data type float64

Hacemos un pequeño output para ver que tal funciona nuestro modelo

In [154]:
index_p = list(indices[0])

data.iloc[index_p]

NameError: name 'indices' is not defined

### **AT LEAST SQUARE**

#### **Spark**

Creamos una app spark para poder manipular grandes volumenes de datos

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
        .appName("my_app") \
        .config("spark.driver.memory", "10g") \
        .config("spark.executor.memory", "10g") \
        .getOrCreate()

spark.conf.set("spark.sql.pivotMaxValues", 30000)

Leemos todos los CSV con spark, no pudimos exportarlos en un csv con anterioridad por un error

In [2]:
#Leemos estas tablas con spark por su tamaño
d1 = spark.read.csv("MLOpsReviews/ratings/1.csv", header=True, inferSchema=True)
d2 = spark.read.csv("MLOpsReviews/ratings/2.csv", header=True, inferSchema=True)
d3 = spark.read.csv("MLOpsReviews/ratings/3.csv", header=True, inferSchema=True)
d4 = spark.read.csv("MLOpsReviews/ratings/4.csv", header=True, inferSchema=True)
d5 = spark.read.csv("MLOpsReviews/ratings/5.csv", header=True, inferSchema=True)
d6 = spark.read.csv("MLOpsReviews/ratings/6.csv", header=True, inferSchema=True)
d7 = spark.read.csv("MLOpsReviews/ratings/7.csv", header=True, inferSchema=True)
d8 = spark.read.csv("MLOpsReviews/ratings/8.csv", header=True, inferSchema=True)


Unimos todos los CSV en un solo dataframe de ratings

In [3]:
df_list = [d1, d2, d3, d4, d5, d6, d7, d8]

ratings = df_list[0]
for i in range(1, len(df_list)):
   ratings = ratings.union(df_list[i])

Creamos una columna movie_index porque el modelo solo recibe parametros numericos

In [4]:
from pyspark.sql.functions import dense_rank, col
from pyspark.sql.window import Window


# Crea un mapeo de los IDs de película únicos a enteros
movie_id_mapping = (ratings.select("movieId")
                   .distinct()
                   .orderBy("movieId")
                   .select(dense_rank().over(Window.orderBy("movieId")).alias("movie_index"), col("movieId"))
                   .cache())

# Crea una nueva columna "movie_index" en el dataframe "ratings" con los IDs de película mapeados a enteros
ratings_with_index = ratings.join(movie_id_mapping, on="movieId", how="inner")

Creamos un modelo ALS

In [5]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

# Crear el modelo ALS
als = ALS(rank=10, maxIter=15, regParam=0.01, userCol="userId", itemCol="movie_index", ratingCol="rating")

# Dividir los datos en entrenamiento y prueba
(training, test) = ratings_with_index.randomSplit([0.8, 0.2])

# Entrenar el modelo con los datos de entrenamiento
model = als.fit(training)

# Realizar predicciones en los datos de prueba
predictions = model.transform(test)

# Calcular el error RMSE en las predicciones
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Obtener las recomendaciones para todos los usuarios
user_recs = model.recommendForAllUsers(5)

Root-mean-square error = nan


#### **Pandas**

El numero de filas en user_recs ya no supera los 200.000 registros por lo que nos conviene transformar el dataframe a pandas el dataframe con las predicciones para todos los usuarios

In [9]:
df = user_recs.toPandas()

Visualizamos algunas recomendaciones

In [89]:
user_id = 4

lista_id = []
for i in list(df.recommendations.loc[df.userId == user_id])[0]:
    lista_id.append(ratings_with_index.select("movieId").filter(col("movie_index") == i[0]).first()[0])
    
data.loc[data.show_id.isin(lista_id)]

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,antiquity_category,rating,duration_int,duration_type,duration_category,listed_in,description,Valoraciones,Promedio_Valoraciones
1308,as5819,Movie,Ed Hill: Candy and Smiley,Jeremy Podlog,[Ed Hill],,,2021,Nueva,13+,68,min,Corta,"[Arts, Entertainment, and Culture, Comedy]","Candy & Smiley is relevant, realistic, and at ...",479,3.6
8509,ns4429,Movie,Christmas With A View,Justin G. Dyck,"[Scott Cavalheiro, Kaitlyn Leeb, Vivica A. Fox...",Canada,2018-11-01 00:00:00,2018,Vieja,TV-PG,91,min,Corta,"[International Movies, Romantic Movies]","Still reeling from a business failure, the res...",497,3.5
13693,as5980,Movie,Ringolevio,Kristin Peterson Kaszubowski,"[Joshua Koopman, Meredith Johnston, Cory Hardin]",,,2020,Nueva,18+,81,min,Corta,"[Comedy, Drama, Romance]","Ada, a reserved young woman with a passion for...",485,3.4
19541,hs1819,TV Show,Basketball: A Love Story,,[None],,2020-02-05 00:00:00,2018,Vieja,TV-G,1,Season,Corta,"[Documentaries, Sports]",A series of short stories that feature intervi...,428,3.5
19656,hs2689,TV Show,XxxHolic,,[None],Japan,2017-09-05 00:00:00,2006,Vieja,TV-PG,1,Season,Corta,"[Anime, Drama, Horror]","There is no such thing as coincidence, there i...",474,3.5
