In [1]:
import pyspark
import pandas as pd

from datetime import datetime, date
from pyspark.sql import SparkSession, Row, Column
from pyspark.sql.window import Window
from pyspark.sql.functions import col, avg, count, explode, split, upper, expr, collect_list, size, split, year, row_number

In [2]:
# spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder \
    .appName("ConsultaPySpark") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

business_path = "data/yelp_academic_dataset_business.json"
review_path = "data/yelp_academic_dataset_review.json"

business_df = spark.read.json(business_path)
review_df = spark.read.json(review_path)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/29 13:18:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/29 13:18:11 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [30]:
# business.json
{"business_id":"Pns2l4eNsfO8kk83dixA6A",
 "name":"Abby Rappoport, LAC, CMQ",
 "address":"1616 Chapala St, Ste 2",
 "city":"Santa Barbara",
 "state":"CA",
 "postal_code":"93101",
 "latitude":34.4266787,
 "longitude":-119.7111968,
 "stars":5.0,
 "review_count":7,
 "is_open":0,
 "attributes":{"ByAppointmentOnly":"True"},
 "categories":"Doctors, Traditional Chinese Medicine, Naturopathic\/Holistic, Acupuncture, Health & Medical, Nutritionists",
 "hours":null}

# review.json
{"review_id":"KU_O5udG6zpxOg-VcAEodg",
 "user_id":"mh_-eMZ6K5RLWhZyISBhwA",
 "business_id":"XQfwVwDr-v0ZS3_CbbE5Xw",
 "stars":3.0,
 "useful":0,
 "funny":0,
 "cool":0,
 "text":"If you decide to eat here, just be aware it is going to take about 2 hours from beginning to end. We have tried it multiple times, because I want to like it! I have been to it's other locations in NJ and never had a bad experience. \n\nThe food is good, but it takes a very long time to come out. The waitstaff is very young, but usually pleasant. We have just had too many experiences where we spent way too long waiting. We usually opt for another diner or restaurant on the weekends, in order to be done quicker.",
 "date":"2018-07-07 22:09:11"}


NameError: name 'null' is not defined

##### 1. Obtener los 10 negocios con mayor número de revisiones (2%)


In [3]:
top_businesses = review_df.groupBy("business_id").agg(count("review_id").alias("num_reviews")) \
    .orderBy(col("num_reviews").desc()).limit(10)

result1 = top_businesses.join(business_df, "business_id").select("name", "num_reviews").orderBy(col("num_reviews").desc())
result1.show()


                                                                                

+--------------------+-----------+
|                name|num_reviews|
+--------------------+-----------+
|   Acme Oyster House|       7673|
|        Oceana Grill|       7516|
|Hattie B’s Hot Ch...|       6160|
|Reading Terminal ...|       5778|
|Ruby Slipper - Ne...|       5264|
| Mother's Restaurant|       5254|
|         Royal House|       5146|
|  Commander's Palace|       4969|
|                Luke|       4661|
|              Cochon|       4480|
+--------------------+-----------+



                                                                                

##### 2. Obtener las 10 categorías con la mayor puntuación media (2%)


In [13]:
avg_stars_by_category = business_df.select("business_id", "stars", "categories") \
    .withColumn("category", explode(split(col("categories"), ", "))) \
    .groupBy("category").agg(avg("stars").alias("avg_stars")) \
    .orderBy(col("avg_stars").desc()).limit(10) \
    .orderBy(col("category"))

result2 = avg_stars_by_category.show()


+--------------------+---------+
|            category|avg_stars|
+--------------------+---------+
|     Art Consultants|      5.0|
|         Calligraphy|      5.0|
|Cheese Tasting Cl...|      5.0|
|       Childproofing|      5.0|
|         Experiences|      5.0|
|          Patent Law|      5.0|
|        Silent Disco|      5.0|
|              Somali|      5.0|
|Sport Equipment Hire|      5.0|
|     Water Suppliers|      5.0|
+--------------------+---------+



##### 3. Obtener las 10 ciudades con la mayor puntuación media (2%)


In [14]:
avg_stars_by_city = business_df.select("city", "stars") \
    .groupBy("city").agg(avg("stars").alias("avg_stars")) \
    .orderBy(col("avg_stars").desc()).limit(10) \
    .orderBy(col("city"))

result3 = avg_stars_by_city.show()


+----------------+---------+
|            city|avg_stars|
+----------------+---------+
|         Arizona|      5.0|
| Delaware County|      5.0|
|         Fernley|      5.0|
|        Norriton|      5.0|
|   Pass-a-Grille|      5.0|
|           Reno |      5.0|
|Rosewood Heights|      5.0|
|          SPARKS|      5.0|
|    Webster Grvs|      5.0|
|    phoenixville|      5.0|
+----------------+---------+



##### 4. Calcular la media de palabras para las reseñas de cada puntuación (1-5 estrellas) (2%)


In [23]:
# Calcular la media de palabras para las reseñas de cada puntuación (1-5 estrellas)
avg_words_by_stars = review_df.groupBy("stars").agg(avg(size(split(col("text"), " "))).alias("avg_words"))

result4 = avg_words_by_stars.show()




+-----+------------------+
|stars|         avg_words|
+-----+------------------+
|  1.0| 135.3761664832581|
|  4.0|109.01844357355336|
|  3.0|125.33368066896554|
|  2.0|135.76835403498455|
|  5.0| 85.32756936366728|
+-----+------------------+



                                                                                

##### 5. Obtener las 10 categorías que más se repiten para cada puntuación (1-5 estrellas) (4%)


In [29]:
# Filtrar las reseñas por puntuaciónes de estrellas
one_star_reviews    = review_df.filter(col("stars") == 1)
two_star_reviews    = review_df.filter(col("stars") == 2)
three_star_reviews  = review_df.filter(col("stars") == 3)
four_star_reviews   = review_df.filter(col("stars") == 4)
five_star_reviews   = review_df.filter(col("stars") == 5)

# Obtener las categorías que más se repiten para la puntuación de 1 estrella
top_categories_for_one_star = (
    one_star_reviews
    .join(business_df, "business_id")
    .select(explode(split("categories", ", ")).alias("category"))
    .groupBy("category")
    .agg(count("*").alias("count"))
    .orderBy(col("count").desc())
    .limit(10)
)

top_categories_for_two_star = (
    two_star_reviews
    .join(business_df, "business_id")
    .select(explode(split("categories", ", ")).alias("category"))
    .groupBy("category")
    .agg(count("*").alias("count"))
    .orderBy(col("count").desc())
    .limit(10)
)

top_categories_for_three_star = (
    three_star_reviews
    .join(business_df, "business_id")
    .select(explode(split("categories", ", ")).alias("category"))
    .groupBy("category")
    .agg(count("*").alias("count"))
    .orderBy(col("count").desc())
    .limit(10)
)

top_categories_for_four_star = (
    four_star_reviews
    .join(business_df, "business_id")
    .select(explode(split("categories", ", ")).alias("category"))
    .groupBy("category")
    .agg(count("*").alias("count"))
    .orderBy(col("count").desc())
    .limit(10)
)

top_categories_for_five_star = (
    five_star_reviews
    .join(business_df, "business_id")
    .select(explode(split("categories", ", ")).alias("category"))
    .groupBy("category")
    .agg(count("*").alias("count"))
    .orderBy(col("count").desc())
    .limit(10)
)

top_categories_for_one_star.show()
top_categories_for_two_star.show()
top_categories_for_three_star.show()
top_categories_for_four_star.show()
top_categories_for_five_star.show()


                                                                                

+--------------------+------+
|            category| count|
+--------------------+------+
|         Restaurants|567185|
|                Food|200062|
|           Nightlife|171310|
|                Bars|160744|
|American (Traditi...|135761|
|            Shopping|110161|
|      American (New)| 98740|
|Event Planning & ...| 91701|
|  Breakfast & Brunch| 89419|
|             Burgers| 81299|
+--------------------+------+



                                                                                

+--------------------+------+
|            category| count|
+--------------------+------+
|         Restaurants|404486|
|                Food|135015|
|           Nightlife|134947|
|                Bars|128301|
|American (Traditi...| 98551|
|      American (New)| 85894|
|  Breakfast & Brunch| 69261|
|          Sandwiches| 55881|
|Event Planning & ...| 54730|
|             Seafood| 53573|
+--------------------+------+



                                                                                

+--------------------+------+
|            category| count|
+--------------------+------+
|         Restaurants|543108|
|           Nightlife|190844|
|                Food|189080|
|                Bars|181185|
|American (Traditi...|130179|
|      American (New)|120373|
|  Breakfast & Brunch| 98102|
|          Sandwiches| 76597|
|             Seafood| 72147|
|Event Planning & ...| 67623|
+--------------------+------+



                                                                                

+--------------------+-------+
|            category|  count|
+--------------------+-------+
|         Restaurants|1130251|
|                Food| 421154|
|           Nightlife| 388516|
|                Bars| 368122|
|American (Traditi...| 248110|
|      American (New)| 247218|
|  Breakfast & Brunch| 210449|
|          Sandwiches| 164547|
|             Seafood| 147859|
|Event Planning & ...| 131959|
+--------------------+-------+





+--------------------+-------+
|            category|  count|
+--------------------+-------+
|         Restaurants|2079441|
|                Food| 868282|
|           Nightlife| 654140|
|                Bars| 617201|
|      American (New)| 432315|
|  Breakfast & Brunch| 400199|
|American (Traditi...| 399045|
|          Sandwiches| 317516|
|             Seafood| 281802|
|Event Planning & ...| 263540|
+--------------------+-------+



                                                                                

##### 6. Analizar cómo un atributo determinado afecta a la puntuación del negocio (4%)


In [13]:
#  Se refiere al campo "atributo" o a uyna columna cualqueira WTF EMALDI

attribute_effect = business_df.select("stars", "attributes.ByAppointmentOnly") \
    .groupBy("ByAppointmentOnly").agg(avg("stars").alias("avg_stars"))

result6 = attribute_effect.show()


[Stage 17:====>                                                   (1 + 11) / 12]

+-----------------+------------------+
|ByAppointmentOnly|         avg_stars|
+-----------------+------------------+
|             None|              3.45|
|            False|3.7063694267515923|
|             null|3.5178923588285946|
|             True|3.9550900121724646|
+-----------------+------------------+



                                                                                

##### 7. Obtener la media anual de puntuación para las 10 categorías con mayor número de reseñas (4%)

In [15]:
# Obtener las 10 categorías con mayor número de reseñas
top_categories_by_reviews = (
    review_df
    .join(business_df, "business_id")
    .select(explode(split("categories", ", ")).alias("category"))
    .groupBy("category")
    .agg(count("*").alias("num_reviews"))
    .orderBy(col("num_reviews").desc())
    .limit(10)
)

# Renombrar la columna "stars" de review_df a "review_stars"
review_df = review_df.withColumnRenamed("stars", "review_stars")

# Filtrar las reseñas para las categorías seleccionadas
filtered_reviews = (
    review_df
    .join(business_df, "business_id")
    .select("review_stars", "date", explode(split("categories", ", ")).alias("category"))
    .join(top_categories_by_reviews, "category")
)

# Agregar una columna de año y calcular la media anual de puntuación para las 10 categorías con mayor número de reseñas
windowSpec = Window.partitionBy("category", "year").orderBy("year")

result = (
    filtered_reviews
    .withColumn("year", year("date"))
    .groupBy("category", "year")
    .agg(avg(col("review_stars")).alias("avg_stars"))
    .withColumn("row_number", row_number().over(windowSpec))
    .filter(col("row_number") == 1)
    .drop("row_number")
    .orderBy("category", "year")
)

result.show()




+--------------------+----+------------------+
|            category|year|         avg_stars|
+--------------------+----+------------------+
|      American (New)|2005| 4.086956521739131|
|      American (New)|2006|  4.05607476635514|
|      American (New)|2007| 3.885222381635581|
|      American (New)|2008| 3.790946896992962|
|      American (New)|2009| 3.693454925429847|
|      American (New)|2010| 3.714959175738725|
|      American (New)|2011|3.7084460042575227|
|      American (New)|2012| 3.705262675766042|
|      American (New)|2013| 3.708643879037094|
|      American (New)|2014|3.7905753274017795|
|      American (New)|2015|3.8261306721544086|
|      American (New)|2016| 3.854635336531896|
|      American (New)|2017|3.8659174072138005|
|      American (New)|2018| 3.887391530984577|
|      American (New)|2019|3.8899515208575934|
|      American (New)|2020| 3.967351756925017|
|      American (New)|2021| 3.894207115460918|
|      American (New)|2022|3.9226490066225166|
|American (Tr

                                                                                

In [18]:
result.show(400)




+--------------------+----+------------------+
|            category|year|         avg_stars|
+--------------------+----+------------------+
|      American (New)|2005| 4.086956521739131|
|      American (New)|2006|  4.05607476635514|
|      American (New)|2007| 3.885222381635581|
|      American (New)|2008| 3.790946896992962|
|      American (New)|2009| 3.693454925429847|
|      American (New)|2010| 3.714959175738725|
|      American (New)|2011|3.7084460042575227|
|      American (New)|2012| 3.705262675766042|
|      American (New)|2013| 3.708643879037094|
|      American (New)|2014|3.7905753274017795|
|      American (New)|2015|3.8261306721544086|
|      American (New)|2016| 3.854635336531896|
|      American (New)|2017|3.8659174072138005|
|      American (New)|2018| 3.887391530984577|
|      American (New)|2019|3.8899515208575934|
|      American (New)|2020| 3.967351756925017|
|      American (New)|2021| 3.894207115460918|
|      American (New)|2022|3.9226490066225166|
|American (Tr

                                                                                

##### EXTRA

In [23]:
# NO es el bueno pero es importtante (creo)
# Obtener las 10 categorías con mayor número de reseñas
top_categories_by_reviews = (
    review_df
    .join(business_df, "business_id")
    .select(explode(split("categories", ", ")).alias("category"))
    .groupBy("category")
    .agg(count("*").alias("num_reviews"))
    .orderBy(col("num_reviews").desc())
    .limit(10)
)

# Renombrar la columna "stars" de review_df a "review_stars"
review_df = review_df.withColumnRenamed("stars", "review_stars")

# Filtrar las reseñas para las categorías seleccionadas
filtered_reviews = (
    review_df
    .join(business_df, "business_id")
    .select("review_stars", explode(split("categories", ", ")).alias("category"))
    .join(top_categories_by_reviews, "category")
)

# Calcular la media y el número de reseñas para las 10 categorías con mayor número de reseñas
avg_stars_and_reviews_by_category = (
    filtered_reviews
    .groupBy("category")
    .agg(avg(col("review_stars")).alias("avg_stars"), count("*").alias("num_reviews"))
    .orderBy("category")
)

avg_stars_and_reviews_by_category.show()


                                                                                

+--------------------+------------------+-----------+
|            category|         avg_stars|num_reviews|
+--------------------+------------------+-----------+
|      American (New)|3.8414833323176305|     984540|
|American (Traditi...|3.6683434719259504|    1011646|
|                Bars| 3.791956733969838|    1455553|
|  Breakfast & Brunch|3.8793193687098673|     867430|
|Event Planning & ...|3.6905174775614262|     609553|
|                Food|  3.89467647923211|    1813593|
|           Nightlife| 3.791832087790476|    1539757|
|         Restaurants|3.7937982897979476|    4724471|
|          Sandwiches| 3.851398540753674|     691864|
|             Seafood| 3.851528503966968|     620247|
+--------------------+------------------+-----------+

