In [43]:
import sys, time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, rand
from pyspark.sql.types import IntegerType, StringType, FloatType, DateType

In [2]:
spark = SparkSession.builder \
    .appName("Yelp Review Analysis") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "16g") \
    .getOrCreate()

In [3]:
# Descomprimir archivo
# !tar -xvf /home/jovyan/work/yelp_dataset.tar -C /home/jovyan/work

1. Toma el archivo review.json JSON y cuantífica cuánto pesa el archivo en disco.

In [4]:
# cuantificar cuánto pesa en disco
!ls -lh /home/jovyan/work/yelp_academic_dataset_review.json

-rwxrwxrwx 1 jovyan 1000 5.0G Jan 19  2022 /home/jovyan/work/yelp_academic_dataset_review.json


2. Carga el JSON en Spark y cuantífica cuánto pesa el DataFramen en memoria RAM.

In [5]:
# Cargar el archivo JSON en un DataFrame
review_df = spark.read.json("/home/jovyan/work/yelp_academic_dataset_review.json")

# Mostrar el esquema para verificar que se haya cargado correctamente
review_df.printSchema()

# Mostrar las primeras filas para verificar los datos
review_df.show()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPp

In [6]:
# Definir función para estimar tamaño
def estimate_dataframe_size(spark, df):
    # Sample a small percentage of the DataFrame and calculate the size
    sample_df = df.sample(False, 0.1).cache()  # Using a 10% sample
    sample_df.count()  # Trigger computation and caching of the sampled data

    # Estimate size using sample size
    size_estimate = (sample_df.rdd.map(lambda x: sys.getsizeof(x)).reduce(lambda x, y: x + y)
                     * (1 / 0.1))  # Adjust based on the sample percentage
    
    sample_df.unpersist()  # Clean up cache
    return size_estimate

In [7]:
# Estimar tamaño
size_in_bytes = estimate_dataframe_size(spark, review_df)

In [8]:
# Imprimir el estimado
print(f"Estimated size in bytes: {size_in_bytes}")

# Estimado con formato usando format()
print("Estimated size in bytes: {0:,}".format(size_in_bytes))

# Estimado con formato usando f-string
print(f"Estimated size in bytes: {size_in_bytes:,}")

Estimated size in bytes: 838564800.0
Estimated size in bytes: 838,564,800.0
Estimated size in bytes: 838,564,800.0


In [9]:
# Convertir a gigabytes
size_in_gigabytes = size_in_bytes / (1024 ** 3)

# Imprimir con 2 decimales usando format()
print("Estimated size in gigabytes: {:.2f} GB".format(size_in_gigabytes))

# Imprimir usando f-string
print(f"Estimated size in gigabytes: {size_in_gigabytes:.2f} GB")

Estimated size in gigabytes: 0.78 GB
Estimated size in gigabytes: 0.78 GB


3. Guarda el DataFrame como parquet en disco y muestra cuánto pesa el archivo. Cómo se compara con el JSON crudo.

In [10]:
# Guardar el DataFrame en formato Parquet
review_df.write.mode('overwrite').parquet("/home/jovyan/work/yelp_review_parquet")

In [11]:
# Guardar el DataFrame en formato Parquet a 10 particiones
# review_df = review_df.repartition(10)
# review_df.write.mode('overwrite').parquet("/home/jovyan/work/yelp_review_parquet")

In [12]:
# Ver tamaño del archivo parquet
!du -sh /home/jovyan/work/yelp_review_parquet

2.9G	/home/jovyan/work/yelp_review_parquet


4. Utiliza el DataFrame, optimiza el tipo de dato que hay en cada columna (i.e. Int32, Int64, Float32, Float64, String, Categorical) y guarda el nuevo DataFrame como parquet. Cuántifica cuánto pesa el DataFrame en memoria RAM y cuánto pesa en disco. Cómo se compara con el parquet crudo.

In [13]:
# Optimizar DF
optimized_df = review_df \
    .withColumn("cool", col("cool").cast(IntegerType())) \
    .withColumn("funny", col("funny").cast(IntegerType())) \
    .withColumn("useful", col("useful").cast(IntegerType())) \
    .withColumn("stars", col("stars").cast(FloatType())) \
    .withColumn("date", col("date").cast(DateType()))  # Convierte la fecha si es necesario

# Mostrar el nuevo esquema para verificar los cambios
optimized_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- funny: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: float (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: integer (nullable = true)
 |-- user_id: string (nullable = true)



In [14]:
# Guardar el DF optimizado
optimized_df.write.mode('overwrite').parquet("/home/jovyan/work/yelp_review_optimized_parquet")

In [15]:
# Cuantificar el tamaño del archivo parquet nuevo
!du -sh /home/jovyan/work/yelp_review_optimized_parquet

2.8G	/home/jovyan/work/yelp_review_optimized_parquet


In [16]:
yelp = optimized_df.cache().select(optimized_df.columns)
yelp_size_in_bytes = yelp._jdf.queryExecution().optimizedPlan().stats().sizeInBytes()

In [17]:
# Convertir a gigabytes
yelp_size_in_gigabytes = yelp_size_in_bytes / (1024 ** 3)

# Imprimir con 2 decimales usando format()
print("Estimated size in gigabytes: {:.2f} GB".format(yelp_size_in_gigabytes))

# Imprimir usando f-string
print(f"Estimated size in gigabytes: {yelp_size_in_gigabytes:.2f} GB")

Estimated size in gigabytes: 3.84 GB
Estimated size in gigabytes: 3.84 GB


5. Utiliza el DataFrame optimizado, guarda en parquet una nueva versión del DataFrame y particionalo por fecha (date). Otra versión por ciudad. Otra por ciudad y fecha.

- Nota: Para particionar por ciudad (city) será necesario que hagas un join con la tabla business.json.
- Nota: Para particionar por fecha hazlo por año y mes. Para hacer esto necesitas extraer el año y mes como columnas.

In [18]:
# Importar Business
business_df = spark.read.json("/home/jovyan/work/yelp_academic_dataset_business.json")
business_df.printSchema()  # Verifica la estructura para localizar el campo de la ciudad

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [37]:
# Realiza un join para incluir la ciudad en el DataFrame de reviews
review_with_business_df = review_df.join(business_df, "business_id", "left").select(review_df["*"], business_df["city"], business_df["state"])

In [38]:
# Agregar columnas de año y mes
review_with_business_df = review_with_business_df.withColumn("year", year("date")).withColumn("month", month("date"))

In [39]:
# Particionar por Fecha (Año y Mes)
review_with_business_df.write.partitionBy("year", "month").mode("overwrite").parquet("/home/jovyan/work/review_partitioned_by_date")

In [40]:
# Particionar por Estado
review_with_business_df.write.partitionBy("state").mode("overwrite").parquet("/home/jovyan/work/review_partitioned_by_state")

In [41]:
# Particionar por Estado y Fecha
review_with_business_df.write.partitionBy("state", "year", "month").mode("overwrite").parquet("/home/jovyan/work/review_partitioned_by_state_date")

In [42]:
# Verificar
!ls -lh /home/jovyan/work/review_partitioned_by_date
!ls -lh /home/jovyan/work/review_partitioned_by_state
!ls -lh /home/jovyan/work/review_partitioned_by_state_date

total 0
-rwxrwxrwx 1 jovyan 1000    0 Apr 15 01:23  _SUCCESS
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:19 'year=2005'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2006'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2007'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2008'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2009'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2010'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2011'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2012'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2013'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2014'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2015'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2016'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2017'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:15 'year=2018'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2019'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:13 'year=2020'
drwxrwxrwx 1 jovyan 1000 4.0K Apr 15 01:15 'year=2021'
drwx

6. Ejecuta un query utilizando sobre la tabla filtrando por una de las ciudades y un años en particular. Registra el tiempo de ejecución.

Aplica el query sobre:

- la versión en JSON
- la versión particionada por fecha
- la versión particionada por fecha y estado

De todos los queries que ejecutaste cuáles fueron más rápidos.

In [56]:
# Define queries
queries = {
    "Count Reviews": "SELECT COUNT(*) FROM reviews WHERE state = 'AZ' AND year = 2019",
    "Average Stars": "SELECT AVG(stars) FROM reviews WHERE state = 'AZ' AND year = 2019",
    "Top 10 Active Users": """
        SELECT user_id, COUNT(*) AS reviews_count
        FROM reviews
        WHERE state = 'AZ' AND year = 2019
        GROUP BY user_id
        ORDER BY reviews_count DESC
        LIMIT 10
    """,
    "Average Useful Votes": "SELECT AVG(useful) FROM reviews WHERE year = 2019"
}

In [57]:
# JSON
df_reviews = spark.read.json("/home/jovyan/work/yelp_academic_dataset_review.json")
df_business = spark.read.json("/home/jovyan/work/yelp_academic_dataset_business.json")

# Realizar el join para incluir la columna 'state' en el DataFrame de reviews
df_reviews = df_reviews.join(df_business, "business_id", "left").select(df_reviews["*"], df_business["state"])

# Añadir la columna 'year' extraída de 'date'
df_reviews = df_reviews.withColumn("year", year("date"))

# Crear una vista temporal
df_reviews.createOrReplaceTempView("reviews_json")

# Ejecutar y medir tiempos para JSON
for name, sql in queries.items():
    print(f"--- Query: {name} ---")
    start_time = time.time()
    spark.sql(sql.replace("reviews", "reviews_json")).show()
    print("JSON time: {:.2f} seconds".format(time.time() - start_time))

--- Query: Count Reviews ---
+--------+
|count(1)|
+--------+
|   56277|
+--------+

JSON time: 11.84 seconds
--- Query: Average Stars ---
+------------------+
|        avg(stars)|
+------------------+
|3.6912770758924607|
+------------------+

JSON time: 11.88 seconds
--- Query: Top 10 Active Users ---
+--------------------+------------------+
|             user_id|reviews_json_count|
+--------------------+------------------+
|d6zIVWiJyPB6PZuAx...|               117|
|I06gY9An4o81XpejL...|               112|
|5uJxiaaUQYMUSEior...|                92|
|R7NM7vIyUfSTXvMsw...|                92|
|U_RCft_ROtu3ts2St...|                88|
|_6j6uCisr8DNewdBl...|                84|
|a2DGFEIov1L7X9rKd...|                84|
|HB52HJAcW7zCcLRw2...|                81|
|xWmYN57XXZbg0LOK8...|                80|
|ABaDXhzUa0UFFB9YS...|                80|
+--------------------+------------------+

JSON time: 12.25 seconds
--- Query: Average Useful Votes ---
+------------------+
|       avg(useful)|
+--

In [58]:
# Por fecha
df_date = spark.read.parquet("/home/jovyan/work/review_partitioned_by_date/")
df_date.createOrReplaceTempView("reviews_date")

# Reutiliza las queries definidas anteriormente

# Ejecutar y medir tiempos para datos particionados por fecha
for name, sql in queries.items():
    print(f"--- Query: {name} ---")
    start_time = time.time()
    spark.sql(sql.replace("reviews", "reviews_date")).show()
    print("Date-partitioned time: {:.2f} seconds".format(time.time() - start_time))

--- Query: Count Reviews ---
+--------+
|count(1)|
+--------+
|   56277|
+--------+

Date-partitioned time: 2.95 seconds
--- Query: Average Stars ---
+------------------+
|        avg(stars)|
+------------------+
|3.6912770758924607|
+------------------+

Date-partitioned time: 2.99 seconds
--- Query: Top 10 Active Users ---
+--------------------+------------------+
|             user_id|reviews_date_count|
+--------------------+------------------+
|d6zIVWiJyPB6PZuAx...|               117|
|I06gY9An4o81XpejL...|               112|
|5uJxiaaUQYMUSEior...|                92|
|R7NM7vIyUfSTXvMsw...|                92|
|U_RCft_ROtu3ts2St...|                88|
|_6j6uCisr8DNewdBl...|                84|
|a2DGFEIov1L7X9rKd...|                84|
|HB52HJAcW7zCcLRw2...|                81|
|ABaDXhzUa0UFFB9YS...|                80|
|xWmYN57XXZbg0LOK8...|                80|
+--------------------+------------------+

Date-partitioned time: 3.02 seconds
--- Query: Average Useful Votes ---
+-----------

In [None]:
# Carga el DataFrame particionado por fecha y estado
df_date_state = spark.read.parquet("/home/jovyan/work/review_partitioned_by_state_date/")
df_date_state.createOrReplaceTempView("reviews_date_state")

# Reutiliza las queries definidas anteriormente

# Ejecutar y medir tiempos para datos particionados por fecha y estado
for name, sql in queries.items():
    print(f"--- Query: {name} ---")
    start_time = time.time()
    spark.sql(sql.replace("reviews", "reviews_date_state")).show()
    print("Date and state-partitioned time: {:.2f} seconds".format(time.time() - start_time))

In [None]:
# Detener la sesión Spark
spark.stop()