In [1]:
# Encuentra la ubicacion de spark
import findspark
findspark.init()
import pyspark
findspark.find()

'C:\\Spark'

In [2]:
# Importamos las bibliotecas necesarias para Koalas y definir alias
import os
import pandas as pd
from functools import reduce
import pyspark
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Configuramos Spark para poder procesar de forma local archivos de gran tamaño
conf = SparkConf().setAppName('appName').setMaster('local') \
    .set("spark.network.timeout", "600s") \
    .set("spark.driver.memory", "12g") \
    .set("spark.executor.memory", "10g") \
    .set("spark.executor.cores", "4") \
    .set("spark.dynamicAllocation.maxExecutors", "2") \
    .set("spark.jars", r"C:\mysql-connector-j-8.1.0\mysql-connector-j-8.1.0.jar")

sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
spark

In [4]:
import databricks.koalas as ks
import os



In [5]:
Reviews_Yelp = spark.read.load(r'D:\Proyecto Integrador Parquet\Data Warehouse\Reviews_Yelp.parquet', format='parquet', inferSchema=True)

In [6]:
Business_Yelp = spark.read.load(r'D:\Proyecto Integrador Parquet\Data Warehouse\ResultadoBUSINESS.parquet', format='parquet', inferSchema=True)

In [7]:
Reviews_Yelp.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- useful: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- text: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- word_count: integer (nullable = true)
 |-- sentiment_score: double (nullable = true)



In [8]:
Business_Yelp.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- categories: string (nullable = true)



In [9]:
Reviews_Yelp = Reviews_Yelp.drop("review_id")

In [10]:
Business_Yelp_name = Business_Yelp.select('business_id', 'name', 'state')
Business_Yelp_name = Business_Yelp_name.withColumnRenamed("name", "business_name")

In [11]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

# Crear la matriz de usuarios-negocios
user_business_matrix = Reviews_Yelp.groupBy("user_id", "business_id") \
    .agg({"stars": "avg", "word_count": "avg", "sentiment_score": "avg", "useful": "avg", "funny": "avg", "cool": "avg"})

user_business_matrix = user_business_matrix.join(Business_Yelp_name, "business_id", "inner")

In [12]:
# Definir los valores permitidos para la columna "state"
allowed_states = ["CA", "TX", "NY", "FL", "IL"]

# Filtrar las filas según los valores permitidos
user_business_matrix = user_business_matrix.filter(col("state").isin(allowed_states))
user_business_matrix = user_business_matrix.drop('state')

In [13]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, lit

# Crea una columna constante para usar como criterio de ordenación
user_business_matrix = user_business_matrix.withColumn("dummy", lit(1))

# Crea una ventana con un orden específico
window = Window.orderBy("dummy")

# Agrega una columna numerada al DataFrame
user_business_matrix = user_business_matrix.withColumn("business_id_int", row_number().over(window))

# Elimina la columna dummy si no es necesaria
user_business_matrix = user_business_matrix.drop("dummy")

In [14]:
user_business_matrix = user_business_matrix.drop('avg(funny)', 'avg(cool)', 'avg(word_count)', 'avg(useful)')

In [15]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# Obtiene los valores únicos de la columna "user_id"
valores_unicos = user_business_matrix.select("user_id").distinct().rdd.flatMap(lambda x: x).collect()

# Crea un diccionario que mapea los valores únicos a números enteros
diccionario_mapping = {valor: idx for idx, valor in enumerate(valores_unicos)}

# Define una función UDF (User Defined Function) para aplicar el mapeo
def mapear_a_numero(valor):
    return diccionario_mapping[valor]

# Registra la función UDF
mapear_a_numero_udf = udf(mapear_a_numero, IntegerType())

# Aplica la función UDF para crear la nueva columna "user_id_int"
user_business_matrix = user_business_matrix.withColumn("user_id_int", mapear_a_numero_udf(user_business_matrix["user_id"]))

# Muestra el DataFrame resultante
user_business_matrix.show()

+--------------------+--------------------+----------+--------------------+--------------------+---------------+-----------+
|         business_id|             user_id|avg(stars)|avg(sentiment_score)|       business_name|business_id_int|user_id_int|
+--------------------+--------------------+----------+--------------------+--------------------+---------------+-----------+
|S2Ho8yLxhKAa26pBA...|29UB_wrnUIdsxV2Zm...|       3.0|             -0.5267|Creole House Rest...|              1|      51352|
|M0r9lUn2gLFYgIwIf...|n2pX5Ae8xCUi2_Wlw...|       5.0|              0.9578|      Baileys' Range|              2|     149150|
|vD2jzpPv4iyOLKzIT...|9yxxvX5ySJSylQk3L...|       5.0|              0.4753| Farmhaus Restaurant|              3|     199571|
|u7_3L1NBWgxhBM_B-...|JW5W4OZCohTvZlRF1...|       4.0|              0.9868|      Pizzeria Vetri|              4|     163829|
|mtGTna-hyFhErb4Zm...|m7l9hVa_dlcvTYjMU...|       2.0|              0.7504|   The Village Diner|              5|      14644|


In [16]:
business_names = user_business_matrix.select('business_id_int', 'business_name')
business_names_pandas = business_names.toPandas()
business_names_pandas.to_parquet(r'C:\Users\Matías Tejerina\Desktop\PG-Google-Yelp\PG_Google_Yelp\PG_Google_Yelp\business_names.parquet', index=False)

In [17]:
user_business_matrix = user_business_matrix.drop("user_id", "business_id", "business_name")

In [18]:
user_business_matrix.show()

+----------+--------------------+---------------+-----------+
|avg(stars)|avg(sentiment_score)|business_id_int|user_id_int|
+----------+--------------------+---------------+-----------+
|       3.0|             -0.5267|              1|      51352|
|       5.0|              0.9578|              2|     149150|
|       5.0|              0.4753|              3|     199571|
|       4.0|              0.9868|              4|     163829|
|       2.0|              0.7504|              5|      14644|
|       5.0|              0.9324|              6|     119981|
|       1.0|             -0.7544|              7|      81477|
|       4.0|              0.9239|              8|     154293|
|       2.0|              0.2263|              9|      26193|
|       5.0|              0.6486|             10|     180657|
|       5.0|              0.3987|             11|     141843|
|       3.0|             -0.3846|             12|     156453|
|       5.0|              0.9749|             13|      50325|
|       

In [19]:
from pyspark.sql.functions import col, lit, round
def calcular_calificacion_combinada(row):
    peso = 0.5  # Peso igual para ambas columnas
    stars = row["avg(stars)"] / 5  # Normaliza "stars" al rango de 0 a 1
    sentiment_score = (row["avg(sentiment_score)"] + 1) / 2  # Normaliza "sentiment_score" al rango de 0 a 1
    calificacion_combinada = (stars * peso) + (sentiment_score * peso)
    return calificacion_combinada

# Aplicar la función a todas las columnas y agregar la nueva columna "rating"
user_business_matrix = user_business_matrix.withColumn("rating", calcular_calificacion_combinada(user_business_matrix))
user_business_matrix = user_business_matrix.withColumn("rating", round(user_business_matrix["rating"], 4))

user_business_matrix.show()

+----------+--------------------+---------------+-----------+------+
|avg(stars)|avg(sentiment_score)|business_id_int|user_id_int|rating|
+----------+--------------------+---------------+-----------+------+
|       3.0|             -0.5267|              1|      51352|0.4183|
|       5.0|              0.9578|              2|     149150|0.9894|
|       5.0|              0.4753|              3|     199571|0.8688|
|       4.0|              0.9868|              4|     163829|0.8967|
|       2.0|              0.7504|              5|      14644|0.6376|
|       5.0|              0.9324|              6|     119981|0.9831|
|       1.0|             -0.7544|              7|      81477|0.1614|
|       4.0|              0.9239|              8|     154293| 0.881|
|       2.0|              0.2263|              9|      26193|0.5066|
|       5.0|              0.6486|             10|     180657|0.9122|
|       5.0|              0.3987|             11|     141843|0.8497|
|       3.0|             -0.3846| 

In [20]:
user_business_matrix = user_business_matrix.drop('avg(stars)', 'avg(sentiment_score)')

In [21]:
user_business_matrix.count()

1494418

In [22]:
user_business_matrix.printSchema()

root
 |-- business_id_int: integer (nullable = false)
 |-- user_id_int: integer (nullable = true)
 |-- rating: double (nullable = true)



In [23]:
from pyspark.sql.functions import col

user_business_matrix = user_business_matrix.filter(col("user_id_int") <= 6745760)

In [24]:
user_business_matrix = user_business_matrix.filter(col("business_id_int") <= 6745760)

In [25]:
# Crear un modelo ALS (Alternating Least Squares)
als_final = ALS(maxIter=10, regParam=0.01, userCol="user_id_int", itemCol="business_id_int", ratingCol="rating")

# Entrenar el modelo ALS
model_final = als_final.fit(user_business_matrix)

In [26]:
# Definir el ID del usuario que deseas utilizar
user_id = 456

# Filtrar el DataFrame para obtener las recomendaciones para ese usuario
user_df = user_business_matrix.filter(user_business_matrix["user_id_int"] == user_id).select("user_id_int")
user_recs = model_final.recommendForUserSubset(user_df, 10)  # Obtener 10 recomendaciones

# Mostrar las recomendaciones
user_recs.show()

+-----------+--------------------+
|user_id_int|     recommendations|
+-----------+--------------------+
|        456|[{659400, 0.99148...|
+-----------+--------------------+



In [27]:
a = user_recs.select("recommendations")

In [28]:
a.show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|recommendations                                                                                                                                                                                                           |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{659400, 0.9914801}, {966963, 0.9632091}, {255107, 0.9625022}, {1366884, 0.95846516}, {175600, 0.95614815}, {1473841, 0.94096786}, {480270, 0.93522567}, {449399, 0.9286774}, {319467, 0.92497885}, {1177494, 0.9206134}]|
+-------------------------------------------------------------------------------------------------------------------

In [32]:
# Recopila los datos en una lista de filas
rows = a.collect()

# Procesa la lista de filas para crear el diccionario de recomendaciones
recommendations_dict = {}
for row in rows:
    recommendations_dict.update(dict(row.recommendations))

# Recopila los keys (business_id_int) de las recomendaciones
keys = recommendations_dict.keys()

# Filtra el DataFrame business_names_pandas para obtener los nombres correspondientes a los keys
top10_recs = business_names_pandas[business_names_pandas['business_id_int'].isin(keys)]['business_name'].tolist()

# Tomar los primeros 10 elementos de la lista (top 10 recomendaciones)
names_list = top10_recs[:10]

# Muestra la lista resultante
print(names_list)

['Zlatno Zito', 'ZaZa Nails & Spa', 'Great Basin Data Recovery', 'Oasis Nails & Spa', 'Dairy Queen Grill & Chill', 'The Cheesecake Factory', "Positano's", "McGuire's Barber Shop", 'Café Amelie', "Vincent's Italian Cuisine"]


In [30]:
# Guardar el modelo entrenado en un archivo
model_final.save(r"C:\Users\Matías Tejerina\Desktop\PG-Google-Yelp\PG_Google_Yelp\PG_Google_Yelp\modelo_als")