In [0]:
import os
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import from_unixtime, to_date, col, lower, regexp_replace, dense_rank, when
from pyspark.conf import SparkConf
from pyspark.sql.window import Window
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [0]:
# Cargamos los datos de metadata que el grupo de analysts unio y realizo el etl correspondiente para analisis
df = spark.read.format("parquet").load("/mnt/backupaws/metadata/metada_newyork/part-00000-8b5573bf-55b1-4249-9e02-80a376685fc7-c000.snappy.parquet")

In [0]:
# Primero vamos a crear un ID unico por cada sitio que tengamos en la metadata para esto primero realizamos un drop por si se repite la misma ubicacion de un local

In [0]:
# en caso tengamos filas duplicadas en la metadata realizamos un drop con el gmap_id
df = df.dropDuplicates(['gmap_id'])

# Creamos una ventana por la columna name, ordenada por name
window2 = Window.orderBy("gmap_id")

# Agregamos una nueva columna id_name a la tabla, usando dense_rank() para asignar un número de identificación único a cada valor de name
df = df.withColumn("id_name_empresa", dense_rank().over(window2))

In [0]:
# Realizamos el filtrado por lugar, en este caso empezaremos por RESTAURANT y seleccionamos solo las columnas que vayamos a usar para el entrenamiento del modelo

In [0]:
# seleccionamos solo los datos a usar en el modelo
date_sitios_ml = df.select('id_name_empresa','name', 'gmap_id', 'category')

# filtramos solo el rubro de restaurantes que es en lo que se enfocara el sistema de recomendacion
date_sitios_ml = date_sitios_ml.filter(lower(df["category"]).like("%bar %"))

# Eliminamos la columna category
date_delivery = date_sitios_ml.drop('category')

In [0]:
# Cargamos ahora los reviews para poder hacer el ETL correspondiente para ML
df_1= spark.read.format("parquet").load("/mnt/backupaws/reviews/18/part-00000-f610cba4-3bf4-481a-a9b6-e7313f0c3e80-c000.snappy.parquet")

In [0]:
# Primero creamos una columna igual a la columna user_id pero lo convertimos a tipo de dato string
df_1 = df_1.withColumn("col_id", col("user_id").cast("string"))

In [0]:
# Ordenamos la columna col_id 
window = Window.orderBy("col_id")

# Agregamos una nueva columna id_name a la tabla, usando dense_rank() para asignar un número de identificación único a cada valor de name
data = df_1.withColumn("id_name", dense_rank().over(window))

In [0]:
# hacemos el respectivo join para poder tener los datos listos para hacer el entrenamiento, el objetivo es tener solo 3 columnas(id_name, id_name_empresa, rating) que son las columnas a usar para el modelo als de recomendacion

In [0]:
# filtramos los datos que vamos a usar del dataframe de nueva york
date_newyork_ml = data.select('id_name', 'rating', 'gmap_id')

# filtramos los datos que necesitamos para el join
date_sitios_ml = date_sitios_ml.select('id_name_empresa', 'gmap_id')

# hacemos un join de los datos que tenemos tanto de sitios como de nueva york usando como id en comun el gmap_id
df_join = date_sitios_ml.join(date_newyork_ml, "gmap_id").join(date_delivery, 'id_name_empresa')

# hacemos el drop de gmap_id que no lo necesitamos
df_test = df_join.drop('gmap_id')

# realizamos el segundo drop
df_test = df_test.drop('name')

In [0]:
# Una vez hecho estos procesos anteriores empezamos con la separacion del entrenamiento del modelo y del testeo con un 80% de datos de entrenamiento

In [0]:
# train test validation split

seed = 1234
(training_df, test_df) = df_test.randomSplit([.8, .2], seed=seed)

In [0]:
# Ejecutamos el codigo para asegurarnos que al momento de guardar los datos en formato parquet nos guarde en un solo archivo .parquet
training_df = training_df.coalesce(1)

In [0]:
# ejecutamos el guardado de los datos para su proceso de ML con ALS y nos de un poco de rapidez al momento de hacer el cross validator
training_df.write.mode("overwrite").parquet("/mnt/backupaws/Datos_ML_BAR/Datos_Bar_entrenamiento")

In [0]:
# Ejecutamos el codigo para asegurarnos que al momento de guardar los datos en formato parquet nos guarde en un solo archivo .parquet
test_df = test_df.coalesce(1)

In [0]:
# ejecutamos el guardado de los datos para su proceso de ML con ALS y nos de un poco de rapidez al momento de hacer el cross validator
test_df.write.mode("overwrite").parquet("/mnt/backupaws/Datos_ML_BAR/Datos_Bar_testeo")