# Optimizar el almacenamiento de datos de gran escala

## Objetivo

* Implementar parquet y csv para almacenar archivos de gran escala.
* Implementar particionamiento de archivos.
* Implementar compresión de archivos.

## Crear dataset

In [7]:
import pandas as pd
import numpy as np

In [5]:
def create_dataframe():
    date = pd.date_range(start = "2018-01-01", end = "2024-03-31")
    stores = ["store_" + str(store_id) for store_id in np.arange(1,1001)]
    size = len(date)
    dfs = []
    for store in stores:
        df_store = pd.DataFrame({'store': store, 'date': date}).assign(unit_cases = np.random.normal(1000, 300, size))
        dfs.append(df_store)

    return pd.concat(dfs, ignore_index=True)

In [6]:
tbl_daily_sales = create_dataframe()

### Almacenar datos como parquet

In [7]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
spark = SparkSession.builder.getOrCreate()

In [8]:
sdf_daily_sales = spark.createDataFrame(tbl_daily_sales)
sdf_daily_sales.show(10)

+-------+-------------------+------------------+
|  store|               date|        unit_cases|
+-------+-------------------+------------------+
|store_1|2018-01-01 00:00:00| 566.0637395827885|
|store_1|2018-01-02 00:00:00| 841.2296484134108|
|store_1|2018-01-03 00:00:00| 728.6107535013277|
|store_1|2018-01-04 00:00:00| 759.0523090844504|
|store_1|2018-01-05 00:00:00| 831.4442485688032|
|store_1|2018-01-06 00:00:00|1238.4051593465656|
|store_1|2018-01-07 00:00:00| 524.1941661828059|
|store_1|2018-01-08 00:00:00|  1236.12484702558|
|store_1|2018-01-09 00:00:00| 661.8240724114337|
|store_1|2018-01-10 00:00:00|  603.060699489198|
+-------+-------------------+------------------+
only showing top 10 rows



In [9]:
(sdf_daily_sales
     .write.parquet(
         "sales_v1", 
         mode="overwrite", 
         partitionBy=["store"]
     )
)

### Particionar datos como parquet por tienda.

In [10]:
sdf_daily_sales_v2 = (
    sdf_daily_sales
     .withColumn("year", f.year(f.col("date")))
     .select("store", "year", "date", "unit_cases")
)

In [11]:
sdf_daily_sales_v2.show(10)

+-------+----+-------------------+------------------+
|  store|year|               date|        unit_cases|
+-------+----+-------------------+------------------+
|store_1|2018|2018-01-01 00:00:00| 566.0637395827885|
|store_1|2018|2018-01-02 00:00:00| 841.2296484134108|
|store_1|2018|2018-01-03 00:00:00| 728.6107535013277|
|store_1|2018|2018-01-04 00:00:00| 759.0523090844504|
|store_1|2018|2018-01-05 00:00:00| 831.4442485688032|
|store_1|2018|2018-01-06 00:00:00|1238.4051593465656|
|store_1|2018|2018-01-07 00:00:00| 524.1941661828059|
|store_1|2018|2018-01-08 00:00:00|  1236.12484702558|
|store_1|2018|2018-01-09 00:00:00| 661.8240724114337|
|store_1|2018|2018-01-10 00:00:00|  603.060699489198|
+-------+----+-------------------+------------------+
only showing top 10 rows



In [12]:
(sdf_daily_sales_v2
     .write.parquet(
         "sales_v2", 
         mode="overwrite", 
         partitionBy=["store", "year"])
)

### Particionar datos como parquet por tienda y año.

In [13]:
(sdf_daily_sales_v2
     .write.parquet(
         "sales_v3", 
         mode="overwrite", 
         partitionBy=["store", "year"],
         compression="gzip"
     )
)

### Tarea

In [1]:
from pyspark.sql import SparkSession

# Iniciar Spark Session
spark = SparkSession.builder.appName("YelpDataAnalysis").getOrCreate()



In [2]:
# Cargar los datos
business_df = spark.read.json("yelp_academic_dataset_business.json")
checkin_df = spark.read.json("yelp_academic_dataset_checkin.json")
review_df = spark.read.json("yelp_academic_dataset_review.json")
tip_df = spark.read.json("yelp_academic_dataset_tip.json")
user_df = spark.read.json("yelp_academic_dataset_user.json")


# Mostrar el esquema para entender la estructura de los datos
review_df.printSchema()
business_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointment

In [3]:
print(review_df.count())  # Verifica la accesibilidad del DataFrame


6990280


In [5]:
business_df.write.parquet("/home/jovyan/work/yelp_business.parquet")
checkin_df.write.parquet("/home/jovyan/work/yelp_checkin.parquet")
review_df.write.parquet("/home/jovyan/work/yelp_review.parquet")
tip_df.write.parquet("/home/jovyan/work/yelp_tip.parquet")
user_df.write.parquet("/home/jovyan/work/yelp_user.parquet")


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_com

Py4JError: An error occurred while calling o49.parquet

In [4]:
review_df.write.parquet("/home/jovyan/work/yelp_review.parquet")

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_com

Py4JError: An error occurred while calling o47.parquet

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Yelp Reviews Analysis") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

In [None]:
# Configurar el número máximo de registros por lote
spark.conf.set("spark.sql.files.maxRecordsPerBatch", "50000")

In [None]:
# Supongamos que 'business_id' es una columna relevante por la que quieres particionar
review_df.write.partitionBy("business_id").mode("overwrite").parquet("/home/jovyan/work/yelp_review.parquet")


#### Nuevo Intento

In [8]:
from pyspark.sql import SparkSession

# Iniciar Spark Session con configuraciones adecuadas
spark = SparkSession.builder \
    .appName("YelpDataAnalysis") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

ConnectionRefusedError: [Errno 111] Connection refused

In [2]:
# Configurar el número máximo de registros por lote
spark.conf.set("spark.sql.files.maxRecordsPerBatch", "50000")

In [3]:
# Cargar los datos
business_df = spark.read.json("yelp_academic_dataset_business.json")
checkin_df = spark.read.json("yelp_academic_dataset_checkin.json")
review_df = spark.read.json("yelp_academic_dataset_review.json")
tip_df = spark.read.json("yelp_academic_dataset_tip.json")
user_df = spark.read.json("yelp_academic_dataset_user.json")

In [4]:
# Mostrar el esquema para entender la estructura de los datos
review_df.printSchema()
business_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointment

In [4]:
# Verifica la accesibilidad del DataFrame y el número de registros
print(review_df.count())

6990280


In [8]:
# Supongamos que 'business_id' es una columna relevante por la que quieres particionar
review_df.write.partitionBy("business_id").parquet("/home/jovyan/work/yelp_review.parquet")

In [9]:
tip_df.write.parquet("/home/jovyan/work/yelp_tip.parquet")

In [10]:
user_df.write.parquet("/home/jovyan/work/yelp_user.parquet")

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f

Py4JError: An error occurred while calling z:py4j.reflection.TypeUtil.isInstanceOf

In [9]:
print(review_df.count())

ConnectionRefusedError: [Errno 111] Connection refused

In [1]:
from pyspark.sql import SparkSession

# Iniciar Spark Session
spark = SparkSession.builder \
    .appName("YelpUserDataAnalysis") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Cargar los datos de usuario
user_df = spark.read.json("yelp_academic_dataset_user.json")

# Mostrar el esquema para entender la estructura de los datos
user_df.printSchema()

# Supongamos que quieres verificar rápidamente la cantidad de registros
print(user_df.count())

# Supongamos que quieres particionar por el campo 'elite' si tiene sentido para tu análisis
# Deberías primero transformar 'elite' si es necesario, por ejemplo, si quieres particionar por el primer año elite o algo similar

# Aquí simplemente guardamos los datos sin particionar, dado que no hay un campo obvio para hacerlo en este caso
user_df.write.mode("overwrite").parquet("/home/jovyan/work/yelp_user.parquet")


root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)

1987897


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_com

Py4JError: An error occurred while calling o35.parquet

In [5]:
# Supongamos que quieres verificar rápidamente la cantidad de registros
print(user_df.count())

1987897


In [6]:
user_df.write.partitionBy("yelping_since").parquet("/home/jovyan/work/yelp_user.parquet")

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_com

Py4JError: An error occurred while calling o53.parquet

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("YelpUserDataAnalysis") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.network.timeout", "800s") \
    .config("spark.executor.heartbeatInterval", "120s") \
    .getOrCreate()

df = spark.createDataFrame([(1, 'a'), (2, 'b')], ["id", "value"])
df.show()


+---+-----+
| id|value|
+---+-----+
|  1|    a|
|  2|    b|
+---+-----+



In [3]:
from pyspark.sql import SparkSession
import os

# Iniciar Spark Session
spark = SparkSession.builder \
    .appName("YelpUserDataAnalysis") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.network.timeout", "800s") \
    .config("spark.executor.heartbeatInterval", "120s") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# Ruta del archivo
file_path = "yelp_academic_dataset_user.json"

# Comprobar si el archivo existe
if os.path.exists(file_path):
    # Cargar los datos de usuario
    user_df = spark.read.json(file_path)

    # Mostrar el esquema para entender la estructura de los datos
    user_df.printSchema()

    # Verificar rápidamente la cantidad de registros
    count = user_df.count()
    print(count)

    # Guardar los datos como Parquet en disco
    output_path = "/home/jovyan/work/yelp_user.parquet"
    user_df.write.mode("overwrite").parquet(output_path)
    print(f"Data saved to {output_path}")
else:
    print(f"Error: File not found at {file_path}")

# Detener la Spark Session
spark.stop()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)

1987897


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 42882)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/usr/local/spark/python/pyspark/accumulators.py", line 271, in accum_updates
    num_updates =

Py4JError: py4j does not exist in the JVM

## Intento Lunes 

In [2]:
import os
from pyspark.sql import SparkSession

# Iniciar Spark Session
spark = SparkSession.builder.appName("YelpDataAnalysis").getOrCreate()


In [9]:
# 1. Toma el archivo `review.json` JSON y cuantífica cuánto pesa el archivo en disco.
# Calcular el tamaño del archivo original en bytes
original_size_bytes = os.path.getsize("yelp_academic_dataset_review.json")

# Convertir bytes a kilobytes (KB)
original_size_kb = original_size_bytes / 1024

# Convertir bytes a megabytes (MB)
original_size_mb = original_size_kb / 1024

# Convertir bytes a gigabytes (GB)
original_size_gb = original_size_mb / 1024

# Mostrar los tamaños en diferentes unidades
print(f"Tamaño del archivo en bytes: {original_size_bytes} bytes")
print(f"Tamaño del archivo en kilobytes: {original_size_kb:.2f} KB")
print(f"Tamaño del archivo en megabytes: {original_size_mb:.2f} MB")
print(f"Tamaño del archivo en gigabytes: {original_size_gb:.2f} GB")

Tamaño del archivo en bytes: 5341868833 bytes
Tamaño del archivo en kilobytes: 5216668.78 KB
Tamaño del archivo en megabytes: 5094.40 MB
Tamaño del archivo en gigabytes: 4.98 GB


In [6]:
#2. Carga el JSON en Spark y cuantífica cuánto pesa el DataFramen en memoria RAM.
# Leer el archivo JSON
review_df = spark.read.json("yelp_academic_dataset_review.json")

# Mostrar el esquema para ver los tipos de datos
review_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [7]:
# Tamaños de tipos de datos
bytes_per_string = 100  # para business_id, review_id, user_id, date
bytes_per_text = 200    # para text
bytes_per_long = 8      # para cool, funny, useful
bytes_per_double = 8    # para stars

# Suma total del tamaño por fila
size_per_row = (4 * bytes_per_string) + bytes_per_text + (3 * bytes_per_long) + bytes_per_double

# Contar las filas
num_rows = review_df.count()

# Calcular el tamaño total
total_size_in_bytes = num_rows * size_per_row
total_size_in_kb = total_size_in_bytes / 1024
total_size_in_mb = total_size_in_kb / 1024
total_size_in_gb = total_size_in_mb / 1024

# Imprimir los tamaños en diferentes unidades
print(f"Estimación del tamaño total en bytes: {total_size_in_bytes:.0f} bytes")
print(f"Estimación del tamaño total en kilobytes: {total_size_in_kb:.2f} KB")
print(f"Estimación del tamaño total en megabytes: {total_size_in_mb:.2f} MB")
print(f"Estimación del tamaño total en gigabytes: {total_size_in_gb:.2f} GB")

Estimación del tamaño total en bytes: 4417856960 bytes
Estimación del tamaño total en kilobytes: 4314313.44 KB
Estimación del tamaño total en megabytes: 4213.20 MB
Estimación del tamaño total en gigabytes: 4.11 GB


In [None]:
#3. Guarda el DataFrame como parquet en disco y muestra cuánto pesa el archivo. Cómo se compara con el JSON crudo.

# Guardar el DataFrame en formato Parquet
review_df.write.partitionBy("business_id").parquet("/home/jovyan/work/yelp_review.parquet")

In [5]:
from pyspark.sql import SparkSession

# Iniciar Spark Session con soporte para Hive
spark = SparkSession.builder \
    .appName("YelpDataAnalysis") \
    .enableHiveSupport() \
    .getOrCreate()

# Leer el archivo JSON
review_df = spark.read.json("yelp_academic_dataset_review.json")

# Definir el número de buckets y aplicar bucketing
num_buckets = 50
review_df.write.format("parquet") \
    .bucketBy(num_buckets, "business_id") \
    .saveAsTable("bucketed_yelp_review")


In [6]:
# Usar Spark SQL para consultar la tabla
spark.sql("SELECT * FROM bucketed_yelp_review LIMIT 10").show()

# Ejemplo de consulta con agregación
spark.sql("""
SELECT business_id, COUNT(*) as review_count 
FROM bucketed_yelp_review 
GROUP BY business_id 
ORDER BY review_count DESC
LIMIT 10
""").show()


+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|icGg8xgDJVufg-xpt...|   0|2015-08-14 04:29:01|    0|Ll2gc883uegEDm40b...|  4.0|The breakfast was...|     0|FEJJM_953DigtkxS2...|
|5ICrDkwtX4ykKOLVJ...|   0|2014-05-15 23:30:39|    0|oq9yVLWKlsYVEM-mB...|  5.0|I give Kaffa Cros...|     0|kk7VOrrhVjy8j3Gwm...|
|PP3BBaVxZLcJU54uP...|   1|2012-01-19 20:27:32|    1|h2Yl0Yejjc60nuA49...|  3.0|I hate to say it,...|     1|_Fayj6NkO2vCv264X...|
|vXhrovlPWEnPKCbZM...|   0|2013-03-01 20:39:58|    0|J3p8i3Am49P5Q9Cfu...|  5.0|What a lovely pro...|     2|OMApNiVHXVylNPvVQ...|
|PP3BBaVxZLcJU54uP...|   1|2009-06-22 12:07:20|    1|ijZrERi-gv1g60h7x...|  4.0|First of a

In [5]:
import os

def get_directory_size(directory):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(directory):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            if os.path.exists(fp) and not os.path.islink(fp):
                total_size += os.path.getsize(fp)
    return total_size

# Ruta relativa al directorio donde se almacenan los datos de la tabla Hive
parquet_directory = "spark-warehouse/bucketed_yelp_review"

# Llama a la función para obtener el tamaño del directorio
total_size_in_bytes = get_directory_size(parquet_directory)

# Convertir a megabytes y gigabytes
total_size_in_mb = total_size_in_bytes / (1024 * 1024)
total_size_in_gb = total_size_in_bytes / (1024 * 1024 * 1024)

# Imprimir los tamaños
print(f"Tamaño total en bytes: {total_size_in_bytes} bytes")
print(f"Tamaño total en megabytes: {total_size_in_mb:.2f} MB")
print(f"Tamaño total en gigabytes: {total_size_in_gb:.2f} GB")

# Tamaño del archivo JSON
json_file_path = "yelp_academic_dataset_review.json"
json_size_in_bytes = os.path.getsize(json_file_path)
json_size_in_mb = json_size_in_bytes / (1024 * 1024)
json_size_in_gb = json_size_in_bytes / (1024 * 1024 * 1024)

# Imprimir el tamaño del JSON
print(f"Tamaño del archivo JSON en bytes: {json_size_in_bytes} bytes")
print(f"Tamaño del archivo JSON en megabytes: {json_size_in_mb:.2f} MB")
print(f"Tamaño del archivo JSON en gigabytes: {json_size_in_gb:.2f} GB")

# Comparación
if total_size_in_gb < json_size_in_gb:
    print("El directorio Parquet es más pequeño que el archivo JSON original.")
else:
    print("El archivo JSON es más pequeño que el directorio Parquet.")


Tamaño total en bytes: 3006953551 bytes
Tamaño total en megabytes: 2867.65 MB
Tamaño total en gigabytes: 2.80 GB
Tamaño del archivo JSON en bytes: 5341868833 bytes
Tamaño del archivo JSON en megabytes: 5094.40 MB
Tamaño del archivo JSON en gigabytes: 4.98 GB
El directorio Parquet es más pequeño que el archivo JSON original.


In [5]:
#4. Utiliza el DataFrame, optimiza el tipo de dato que hay en cada columna (i.e.    `Int32`, `Int64`, `Float32`, `Float64`, `String`, `Categorical`) y guarda el    nuevo DataFrame como parquet. Cuántifica cuánto pesa el DataFrame en memoria    RAM y cuánto pesa en disco. Cómo se compara con el parquet crudo.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, col, year, month
from pyspark.sql.types import IntegerType, DoubleType

# Iniciar Spark Session
spark = SparkSession.builder.appName("Optimize Yelp Data").getOrCreate()

# Leer el archivo JSON
review_df = spark.read.json("yelp_academic_dataset_review.json")

# Optimizar los tipos de datos
optimized_df = review_df.withColumn("cool", col("cool").cast(IntegerType())) \
                         .withColumn("funny", col("funny").cast(IntegerType())) \
                         .withColumn("useful", col("useful").cast(IntegerType())) \
                         .withColumn("stars", col("stars").cast(DoubleType()))



In [None]:
# Guardar el DataFrame optimizado como Parquet
optimized_df.write.mode("overwrite").parquet("optimized_reviews.parquet")

# Función para calcular el tamaño del directorio
def get_directory_size(directory):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(directory):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            if os.path.exists(fp):
                total_size += os.path.getsize(fp)
    return total_size

# Tamaño del directorio donde se almacenaron los datos Parquet optimizados
optimized_parquet_size = get_directory_size("optimized_reviews.parquet")
print(f"Tamaño de los datos Parquet optimizados en disco: {optimized_parquet_size} bytes")

# Estimación del tamaño en memoria, utilizando una aproximación basada en tipos de datos
# (asumiendo que las optimizaciones han sido aplicadas a todo el DataFrame)
total_rows = optimized_df.count()
estimated_row_size = 4 * 3 + 8  # 3 integers and 1 double
estimated_memory_usage = total_rows * estimated_row_size
print(f"Estimación del uso de memoria del DataFrame optimizado: {estimated_memory_usage} bytes")


In [6]:
# Extraer año y mes de la columna 'date'
optimized_df = optimized_df.withColumn("year", year(col("date"))) \
                     .withColumn("month", month(col("date")))

# Guardar el DataFrame optimizado y particionado por año y mes como Parquet
output_path = "reviews_partitioned_by_year_month"
optimized_df.write.mode("overwrite").partitionBy("year", "month").parquet(output_path)


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 