**S41** Utilizando los textos de las reviews y las técnicas de NLP(TF-IDF), de modo que la query sea ‘high quality’, devolver el nombre de la pizzería, la review y la ciudad, que haya sido reconocida por la calidad de sus productos.

In [1]:
!pip install pyspark==3.2.1



In [2]:
from pyspark.sql import *
import datetime

In [3]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)



CREO LOS RDD

In [4]:
# Change the file directories to yours, the files are in a public folder in google drive:
# https://drive.google.com/drive/folders/1CsvJs0xZ9SCLtG4ci2dNGCkhqnI-1DLS?usp=drive_link
df_review = sqlContext.read.csv('/content/drive/MyDrive/2022/Orga_de_Datos/Datasets/review.csv', header=True, inferSchema=True)
df_review = df_review.dropna(subset=("business_id", "text"))
rdd_review = df_review.rdd

In [5]:
df_business = sqlContext.read.csv('/content/drive/MyDrive/2022/Orga_de_Datos/Datasets/business.csv', header=True, inferSchema=True)
df_business = df_business.dropna(subset=("business_id", "name", "city", "categories"))
rdd_business = df_business.rdd

1. Dejar sólo lo que me interesa del rdd_business ("business_id", "name", "city" y "categories")
2. Filtrar las pizzerías del rdd_business
3. Quedarme con lo que me interesa del rdd_review ("business_id" y "text")
4. Hacer join de los "busienss_id" que son pizzerías con el rdd_review
6. Debería tener: ("business_id", "name", "city", "text")
7. Buscar la query "high quality"




In [6]:
# Este rdd tiene las pizzerías
rdd_pizzas = rdd_business.map(lambda x: (x.business_id, (x.name, x.city, x.categories)))\
                         .filter(lambda x: "Pizza" in x[1][2].split(", ") or "Pizza" in x[1][0])\
                         .map(lambda x: (x[0], (x[1][0], x[1][1])))

In [7]:
# Este rdd tiene las reviews de las pizzerías, el nombre y ciudad
rdd_useful_reviews = rdd_review.map(lambda x: (x.business_id, x.text))\
                               .join(rdd_pizzas)\
                               .map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1])))\
                               .cache()

In [8]:
# Este rdd guarda sólo los textos de las reviews, se usará para el NLP
rdd_texts = rdd_useful_reviews.map(lambda x: x[1][0])\
                              .filter(lambda x: len(x) > 0)\
                              .cache()

In [9]:
# Importamos librerías necesarias para procesar los textos de las reviews
import re
pattern = '[a-z]+'

import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
en_stops = set(stopwords.words('english'))

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [10]:
# Filtra las stopwords, y algunos caracteres extra que encontré haceindo pruebas y no agregan valor
def filter_stopwords(words_list):
  words_no_stop = []

  for word in words_list:
    if word not in en_stops and word not in {"e", "w"}:
      words_no_stop.append(word)

  return words_no_stop

In [11]:
# Este rdd guarda las palabras filtradas con su respectiva frecuencia
word_frequency_rdd = rdd_texts.map(lambda x: re.findall(pattern, x))\
                              .map(lambda x: filter_stopwords(x))\
                              .flatMap(lambda x: x)\
                              .map(lambda x: (x, 1))\
                              .reduceByKey(lambda x,y: x+y)\
                              .map(lambda x: (x[1], x[0]))\
                              .sortByKey(False)\
                              .map(lambda x: (x[1], x[0]))\
                              .cache()

In [12]:
# Busco las 5000 palabras más comunes
common_words = word_frequency_rdd.keys().take(5000)

In [13]:
text_count = rdd_texts.count()

In [14]:
# Diccionario para acceder a las frecuencias de cada palabra fácilmente (Para calcular IDF)
frequency_dict = word_frequency_rdd.collectAsMap()

In [15]:
import math

In [16]:
# Funciones que permiten calcular el TF_IDF de un texto
def idf (word):
  return math.log10(text_count + 1 / frequency_dict[word])

def tf_idf (words, common_words):
  tf_idf = []
  for word in common_words:
    tf_idf.append(words.count(word) * idf(word))
  return tf_idf

In [17]:
# Funciones para calcular norma y producto escalar de vectores
def norm (vect):
  return math.sqrt(sum([x**2 for x in vect]))

def scalar_product(vect_1, vect_2):
  if len(vect_1) != len(vect_2):
      return 0

  return sum(i[0] * i[1] for i in zip(vect_1, vect_2))

In [18]:
query = "high quality"
q = tf_idf(query.split(" "), common_words)

In [19]:
#Se realizan los cálculos del método TFxIDF para cada texto, y se reduce para obtener el de mayor puntaje
rdd_texts.map(lambda x: (x, re.findall(pattern, x)))\
         .map(lambda x: (x[0], filter_stopwords(x[1])))\
         .map(lambda x: (tf_idf(x[1], common_words), x[0]))\
         .filter(lambda x: norm(x[0]) > 0)\
         .map(lambda x: ((scalar_product(x[0], q)) / (norm(x[0]) * norm(q)), x[1]))\
         .reduce(lambda x,y: x if x[0] > y[0] else y)

(0.6324555317559342, 'Very high-quality NY Style pizza here!  ')

In [20]:
# Una vez obtenida la review, consigo los datos de la pizzería del rdd anterior
data = rdd_useful_reviews.filter(lambda x: x[1][0] == 'Very high-quality NY Style pizza here!  ').collect()[0][1]

In [21]:
print(f"La pizzería que mejor responde a la consulta: '{query}' es '{data[1]}', y se ubica en {data[2]}. La review encontrada fue: '{data[0]}'")

La pizzería que mejor responde a la consulta: 'high quality' es 'Eddie & Sam's NY Pizza', y se ubica en Tampa. La review encontrada fue: 'Very high-quality NY Style pizza here!  '
