In [1]:
import py4j.protocol  
from py4j.protocol import Py4JJavaError  
from py4j.java_gateway import JavaObject  
from py4j.java_collections import JavaArray, JavaList
import pyspark.sql.functions as func
from pyspark import RDD, SparkContext  
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.sql.types import StringType,ByteType

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [3]:
spark

### 1. Toma el archivo review.json JSON y cuantífica cuánto pesa el archivo en disco.

In [4]:
%%time
df = spark.read.json('yelp_dataset/yelp_academic_dataset_review.json')

CPU times: user 11.8 ms, sys: 6.58 ms, total: 18.4 ms
Wall time: 26.6 s


### 2. Carga el JSON en Spark y cuantífica cuánto pesa el DataFramen en memoria RAM.

In [5]:
def get_size_in_ram(df):  
    yelp = df.cache().select(df.columns)
    size_in_bytes = yelp._jdf.queryExecution().optimizedPlan().stats().sizeInBytes() 
    return size_in_bytes
df_ram_size = get_size_in_ram(df)

In [6]:
print(f"tamaño en RAM {round(df_ram_size/1.e9,2)}GB")

tamaño en RAM 5.34GB


### 3. Guarda el DataFrame como parquet en disco y muestra cuánto pesa el archivo. Cómo se compara con el JSON crudo

In [7]:
%%time
df.write.parquet('data/review_raw.parquet')

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most 

Py4JError: py4j.reflection does not exist in the JVM

**review_raw.parquet pesa 3.01GB mientras que JSON pesa 5.34**

### 4. Utiliza el DataFrame, optimiza el tipo de dato que hay en cada columna (i.e. Int32, Int64, Float32, Float64, String, Categorical) y guarda el nuevo DataFrame como parquet. Cuántifica cuánto pesa el DataFrame en memoria RAM y cuánto pesa en disco. Cómo se compara con el parquet crudo.

In [7]:
df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [8]:
df = df.withColumn("stars", func.col("stars").cast("float"))

In [9]:
df = df.withColumn("useful", func.col("useful").cast(ByteType()))
df = df.withColumn("funny", func.col("funny").cast(ByteType()))
df = df.withColumn("cool", func.col("cool").cast(ByteType()))

In [10]:
df = df.withColumn("date", func.to_date("date"))

In [11]:
df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: byte (nullable = true)
 |-- date: date (nullable = true)
 |-- funny: byte (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: float (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: byte (nullable = true)
 |-- user_id: string (nullable = true)



In [12]:
df_ram_size = get_size_in_ram(df)
print(f"tamaño en RAM {round(df_ram_size/1.e9,2)}GB")

tamaño en RAM 3.78GB


In [38]:
%%time
df.write.parquet('data/review_data_type.parquet')

**2.95 GB peso en Disco**

### 5. Utiliza el DataFrame optimizado, guarda en parquet una nueva versión del DataFrame y particionalo por fecha (date). Otra versión por ciudad. Otra por ciudad y fecha

In [13]:
df_fechas = (
    df
    .withColumn("year", func.year(func.col("date")))
    .withColumn("month", func.month(func.col("date")))
)

In [14]:
%%time
df_fechas.write.parquet(
    'data/review_data_date.parquet',
    mode='overwrite',
    partitionBy=["year","month"]
)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.11/site-packages/IPython/core/magics/execution.py", line 1332, in time


ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
business = spark.read.json(path='/yelp-dataset/yelp_academic_dataset_business.json')
business = business.select("business_id", "state")
df_fechas_business = df_fechas.join(business, on="business_id")
df_fechas_business.printSchema()
df_fechas_business.write.parquet(
    "data/review_data_state.parquet",
    mode="overwrite",
    partitionBy=["state"])
)

In [None]:
df_fechas_business.write.parquet(
    "data/review_data_date_state.parquet",
    mode="overwrite",
    partitionBy=["year","month","state"])
)

### 6. Ejecuta un query utilizando sobre la tabla filtrando por una de las ciudades y un años en particular. Registra el tiempo de ejecución. Aplica el query sobre

In [None]:
df.createOrReplaceTempView("reviews")
reviews = spark.sql('SELECT review_id, stars, text, user_id, business_id, useful FROM reviews')
reviews.createOrReplaceTempView("reviews")
reviews.show(10)

In [None]:
%%time
# dataframe particionado por fecha
spark.sql('SELECT COUNT(*) AS review_count FROM reviews').show(10)

In [None]:
%%time
# particionado por fecha
df_fechas.createOrReplaceTempView("reviews")
reviews = spark.sql('SELECT review_id, stars, text, user_id, business_id, useful FROM reviews')
reviews.createOrReplaceTempView("reviews")
spark.sql('SELECT COUNT(*) AS review_count FROM reviews').show(10)

In [None]:
%%time
# particionado por fecha
df_fechas_business.createOrReplaceTempView("reviews")
reviews = spark.sql('SELECT review_id, stars, text, user_id, business_id, useful FROM reviews')
reviews.createOrReplaceTempView("reviews")
spark.sql('SELECT COUNT(*) AS review_count FROM reviews').show(10)

## La computadora no pudo correr desde las particiones y no se encontró el error