In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import from_unixtime, to_date
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import lower

In [2]:
# crea una instancia de SparkSession
spark = SparkSession.builder.appName("Sitios - restaurantes").getOrCreate()

1. TRABAJAMOS DIRECTAMENTE CON LOS DATOS DEL ESTADO DE NUEVA YORK

In [3]:
# Define the schema1
schema1 = StructType([
    StructField("user_id", StringType(), True), # no me reconoce como integer
    StructField("name", StringType(), True),
    StructField("time", StringType(), True), # no me reconoce como integer
    StructField("rating", IntegerType(), True),
    StructField("text", StringType(), True),
    StructField("pics", StringType(), True),
    StructField("resp", StringType(), True),
    StructField("gmap_id", StringType(), True)
])

# Create an empty DataFrame with the schema
data = spark.createDataFrame([], schema1)

folder_path_estados = './reviews-estados/review-New_York'

# Lee cada archivo JSON en la carpeta y añádelo al DataFrame
for filename in os.listdir(folder_path_estados):
    if filename.endswith(".json"):
        file_path = os.path.join(folder_path_estados, filename)
        temp_df = spark.read.json(file_path, schema1)
        data = data.union(temp_df)

# Muestra el DataFrame resultante
data.show()

+--------------------+--------------------+-------------+------+--------------------+--------------------+--------------------+--------------------+
|             user_id|                name|         time|rating|                text|                pics|                resp|             gmap_id|
+--------------------+--------------------+-------------+------+--------------------+--------------------+--------------------+--------------------+
|11372210469230823...|      Alvin Martinez|1603494795361|     5|I'm late to posti...|[{"url":["https:/...|                null|0x89c25fc9494dce4...|
|10729344149210932...|     Johnnie Jackson|1620157037403|     1|Very dissatisfied...|                null|{"time":162026836...|0x89c25fc9494dce4...|
|10037858580181940...|        Manie Blazer|1597431662039|     5|Excellent very we...|                null|                null|0x89c25fc9494dce4...|
|11499816115301982...|      Fashion Fiinds|1543773862044|     5|Basing my review ...|                null|

In [4]:
# transformamos la marca de tiempo en milisegundos a tipo de dato fecha "date"
data = data.withColumn("timestamp_seconds", data["time"] / 1000) \
       .withColumn("date", to_date(from_unixtime("timestamp_seconds")))

In [5]:
# eliminamos las columnas que ya no se va a utilizar
data = data.drop('time', 'timestamp_seconds')

In [6]:
# en caso tengamos filas duplicadas
data = data.dropDuplicates()

In [7]:
data.show()

+--------------------+--------------------+------+--------------------+----+--------------------+--------------------+----------+
|             user_id|                name|rating|                text|pics|                resp|             gmap_id|      date|
+--------------------+--------------------+------+--------------------+----+--------------------+--------------------+----------+
|11690824775570679...|   Svetlana Poliakov|     1|                null|null|                null|0x89c2446962b02a1...|2019-03-24|
|10265187065709847...|    Maria defabritis|     5|                null|null|                null|0x89c25928463ca71...|2021-08-22|
|11203276021940441...|         Eddie Pipes|     4|    These guys rock✌|null|                null|0x89c262619fa8508...|2018-12-06|
|11223099235166105...|       Angel Sanchez|     5|                null|null|                null|0x89c25fa92c86f85...|2020-02-17|
|10068343302999201...|             Laura S|     5|Went to the farm ...|null|              

2. TRABAJAMOS CON LOS DATOS DE LOS SITIOS

In [9]:
# Define the schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("gmap_id", StringType(), True),
    StructField("description", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("category", StringType(), True),
    StructField("avg_rating", FloatType(), True),
    StructField("num_of_reviews", IntegerType(), True),
    StructField("price", StringType(), True),
    StructField("hours", StringType(), True),
    StructField("MISC", StringType(), True),
    StructField("state", StringType(), True),
    StructField("relative_results", StringType(), True),
    StructField("url", StringType(), True)
])

# Create an empty DataFrame with the schema
df = spark.createDataFrame([], schema)

folder_path = './metadata-sitios'

# Lee cada archivo JSON en la carpeta y añádelo al DataFrame
for filename in os.listdir(folder_path):
    if filename.endswith(".json"):
        file_path = os.path.join(folder_path, filename)
        temp_df = spark.read.json(file_path, schema)
        df = df.union(temp_df)

In [10]:
# verificamos como queda el df
df.show()

+--------------------+--------------------+--------------------+--------------------+---------+----------+--------------------+----------+--------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|                name|             address|             gmap_id|         description| latitude| longitude|            category|avg_rating|num_of_reviews|price|               hours|                MISC|               state|    relative_results|                 url|
+--------------------+--------------------+--------------------+--------------------+---------+----------+--------------------+----------+--------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|     Porter Pharmacy|Porter Pharmacy, ...|0x88f16e41928ff68...|                null|  32.3883|  -83.3571|        ["Pharmacy"]|       4.9|            16| null|[["Friday","8AM–6...|{"Service options...|   O

In [11]:
# en caso tengamos filas duplicadas
df = df.dropDuplicates()

3. MODELO ALS

In [12]:
# seleccionamos solo los datos a usar en el modelo
date_sitios_ml = df.select('name', 'gmap_id', 'avg_rating', 'category')

In [14]:
# observamos los datos
date_sitios_ml.show()

+--------------------+--------------------+----------+--------------------+
|                name|             gmap_id|avg_rating|            category|
+--------------------+--------------------+----------+--------------------+
|        City Textile|0x80c2c98c0e3c16f...|       4.5|["Textile exporter"]|
|       DComputer Inc|0x8665a6e2bff9ec5...|       2.8|["Computer repair...|
|SERVPRO of Benton...|0x8811e37ceb7dcb7...|       4.8|["Water damage re...|
|Top Cat Seafood R...|0x864e9891e381f3d...|       3.9|["Seafood restaur...|
|  Bjorn-Lass Kennels|0x89c3071901bfe39...|       4.8|["Kennel","Pet gr...|
|Klarisana - Ketam...|0x876b89cee8016a6...|       4.9|["Mental health c...|
|Fabolous Nails & ...|0x86249c04fc8b0d5...|       4.3|      ["Nail salon"]|
|Stephen Johnson, ...|0x88470ba747027ad...|       4.6|       ["Urologist"]|
|Eastridge Dental ...|0x86e7453d2e30589...|       4.9|         ["Dentist"]|
|P.S. X114 - Luis ...|0x89c2f4300e929ba...|       4.5|["School","Public...|
|PALMS WEST 

In [15]:
# filtramos solo el rubro de restaurantes que es en lo que se enfocara el sistema de recomendacion
date_sitios_ml = date_sitios_ml.filter(lower(df["category"]).like("%restaurant%"))

In [16]:
# verificamos los datos como nos queda
date_sitios_ml.show()

+--------------------+--------------------+----------+--------------------+
|                name|             gmap_id|avg_rating|            category|
+--------------------+--------------------+----------+--------------------+
|Top Cat Seafood R...|0x864e9891e381f3d...|       3.9|["Seafood restaur...|
|Habibi Halal Gyro...|0x89c25b373b8ec23...|       5.0|["Fast food resta...|
|Admiral Craft Equ...|0x89c280501e760d5...|       3.0|["Manufacturer","...|
|      The Lob Father|0x8858310e235e921...|       3.3|      ["Restaurant"]|
|Lilikoi Asian Bistro|0x87badfb47acde91...|       4.7|      ["Restaurant"]|
|          San Julian|0x87b213efebe510f...|       4.1|      ["Restaurant"]|
|               Shell|0x8843382b9a776ae...|       4.9|["Gas station","A...|
|    Bocca Steakhouse|0x80c298259dc3e43...|       4.0|["Steak house","B...|
|               Eatly|0x883f5be0a76a1c9...|       5.0|      ["Restaurant"]|
|     Socorrito Cenar|0x808e331bc8f0000...|       4.5|      ["Restaurant"]|
|  Long John

In [17]:
# filtramos los datos que vamos a usar del dataframe de nueva york
date_newyork_ml = data.select('user_id', 'name', 'rating', 'text', 'gmap_id', 'date')

In [18]:
# observamos los datos
date_newyork_ml.show()

+--------------------+--------------------+------+--------------------+--------------------+----------+
|             user_id|                name|rating|                text|             gmap_id|      date|
+--------------------+--------------------+------+--------------------+--------------------+----------+
|11690824775570679...|   Svetlana Poliakov|     1|                null|0x89c2446962b02a1...|2019-03-24|
|10265187065709847...|    Maria defabritis|     5|                null|0x89c25928463ca71...|2021-08-22|
|11203276021940441...|         Eddie Pipes|     4|    These guys rock✌|0x89c262619fa8508...|2018-12-06|
|11223099235166105...|       Angel Sanchez|     5|                null|0x89c25fa92c86f85...|2020-02-17|
|10068343302999201...|             Laura S|     5|Went to the farm ...|0x89c2b9d44e76948...|2019-03-23|
|11724734180229028...|           Jen Erwig|     5|Always a pleasant...|0x89e840f01ceabe4...|2019-01-29|
|10897090685492166...|       Joseph Istvan|     4|Is a great pla

In [19]:
# hacemos un join de los datos que tenemos tanto de sitios como de nueva york usando como id en comun el gmap_id
df_join = date_sitios_ml.join(date_newyork_ml, "gmap_id")

In [20]:
# observamos como nos queda el nuevo df despues del join
df_join.show()

+--------------------+-------+----------+--------------------+--------------------+--------------------+------+--------------------+----------+
|             gmap_id|   name|avg_rating|            category|             user_id|                name|rating|                text|      date|
+--------------------+-------+----------+--------------------+--------------------+--------------------+------+--------------------+----------+
|0x4cca29c04f91cd3...|Dunkin'|       3.5|["Coffee shop","B...|10136595532319861...|          Ian Wootan|     5|                null|2020-01-12|
|0x4cca29c04f91cd3...|Dunkin'|       3.5|["Coffee shop","B...|10073014936510875...|        Lynn Trombly|     5|Love to order ahe...|2021-02-28|
|0x4cca29c04f91cd3...|Dunkin'|       3.5|["Coffee shop","B...|10566878241022763...|       Naomi Chemtob|     5|This is a great b...|2019-08-18|
|0x4cca29c04f91cd3...|Dunkin'|       3.5|["Coffee shop","B...|11502203106029475...|                 V Q|     5|                null|2020