In [None]:
#Docker run instructions
#Ve al repo en tu local con la terminal y corre el siguiente código.
#docker run -it --rm -p 8888:8888 -v "${PWD}":/home/jovyan/work docker.io/jupyter/pyspark-notebook:latest

from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, BooleanType, ArrayType
from pyspark.sql import SparkSession
import tarfile
import os
import pyspark.sql.functions as f
from pyspark.sql.functions import col

In [None]:
# Crear Spark Session
spark = SparkSession.builder.getOrCreate()

In [None]:
# Funcion para calcular el tamano de un dataframe
def get_dataframe_size(df):
    # Calculate the size of each column
    col_sizes = [f.length(f.col(col_name).cast("string")).alias(col_name) for col_name in df.columns]
    # Calculate the total size
    total_size = df.select(*col_sizes).groupBy().sum().collect()[0][0]
    return total_size

In [None]:
# Peso de review.json
file_size = os.path.getsize("./data/yelp_dataset.json")
print("Tamaño del archivo:", file_size, "bytes")

In [None]:
# Carga los archivos extraídos (review.json) en un DataFrame
df = spark.read.json("./data/yelp_dataset.json")

In [None]:
# Calcular el tamaño
dataframe_size = get_dataframe_size(df)
print("Tamaño:", dataframe_size, "bytes")

In [None]:
# Guardar el dataframe en parquet
df.write.parquet('review.parquet', mode="overwrite")

In [None]:
# Peso de review.parquet
file_size = os.path.getsize("review.parquet")
print("Tamaño del archivo:", file_size, "bytes")

In [None]:
# Inferir el esquema inicial
initial_schema = df.schema
initial_schema

In [None]:
# Definir el nuevo esquema
optimized_schema = StructType([
    StructField("review_id", StringType(), nullable=True),
    StructField("user_id", StringType(), nullable=True),
    StructField("business_id", StringType(), nullable=True),
    StructField("stars", IntegerType(), nullable=True),
    StructField("date", DateType(), nullable=True),
    StructField("text", StringType(), nullable=True),
    StructField("useful", IntegerType(), nullable=True),
    StructField("funny", IntegerType(), nullable=True),
    StructField("cool", IntegerType(), nullable=True)
])

In [None]:
df_reducido = spark.read.schema(optimized_schema).json("./data/ini/yelp_dataset.json")

In [None]:
# Calcular el tamaño del df en ram
dataframe_size = get_dataframe_size(df_reducido)
print("Tamaño:", dataframe_size, "bytes")

In [None]:
#Guardar el df_optimized en parquet
df_optimized.write.parquet('review_op.parquet', mode="overwrite")

In [None]:
# Peso del parquet review_op.parquet
file_size = os.path.getsize("review_op.parquet")
print("Tamaño del archivo:", file_size, "bytes")

In [None]:
# Carga los archivos extraídos business.json en un DataFrame
df_bus = spark.read.json("./data/ini/yelp_dataset.json")
df_bus = df_bus.drop("stars")

Se procede a crear el df reducido.

In [None]:
# Extraer columnas de interés de df_optimized
df_reducido_v2 = (
    df_reducido
     .withColumn("year", f.year(f.col("date")))
     .withColumn("month", f.month(f.col("date")))
     .select("review_id", "user_id", "business_id", "stars", "year", "month", "date", "text", "useful", "funny", "cool")
)

In [None]:
schema_df_reducido_v2 = df_reducido_v2.schema
schema_df_reducido_v2

In [None]:
%%time

# Realizar inner join en la columna 'business_id'
joined_df = df_bus.join(df_reducido_v2, "business_id", "inner")

Obtenemos el Parquet particionado por fecha

In [None]:
(joined_df
     .write.parquet(
         "op_df_v1", 
         mode="overwrite", 
         partitionBy=["year", "month"],
         compression="gzip"
     )
)

Parquet particionado por ciudad

In [None]:
(joined_df
     .write.parquet(
         "joined_df_v1", 
         mode="overwrite", 
         partitionBy=["state"],
         compression="gzip"
     )
)

In [None]:
# Parquet particionado por ciudad y fecha
(joined_df
     .write.parquet(
         "joined_df_v2", 
         mode="overwrite", 
         partitionBy=["state", "year", "month"],
         compression="gzip"
     )
)

In [None]:
# Cargar el archivo Parquet en un df
op_df_v1 = spark.read.parquet("./op_df_v1")

# Aplicar el filtro por estado (state) y año (year)
filtered_df = op_df_v1.filter((op_df_v1.state == "CA") & (op_df_v1.year == "2016"))

In [None]:
# Definir el nuevo esquema
joined_schema = StructType([
    StructField("review_id", StringType(), nullable=True),
    StructField("user_id", StringType(), nullable=True),
    StructField("business_id", StringType(), nullable=True),
    StructField("stars", IntegerType(), nullable=True),
    StructField("date", DateType(), nullable=True),
    StructField("year", IntegerType(), nullable=True),
    StructField("month", IntegerType(), nullable=True),
    StructField("text", StringType(), nullable=True),
    StructField("useful", IntegerType(), nullable=True),
    StructField("funny", IntegerType(), nullable=True),
    StructField("cool", IntegerType(), nullable=True),
    StructField("name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("review_count", IntegerType(), True),
    StructField("is_open", IntegerType(), True),
    StructField("attributes", 
                StructType([
                    StructField("RestaurantsTakeOut", BooleanType(), True),
                    StructField("BusinessParking", 
                                StructType([
                                    StructField("garage", BooleanType(), True),
                                    StructField("street", BooleanType(), True),
                                    StructField("validated", BooleanType(), True),
                                    StructField("lot", BooleanType(), True),
                                    StructField("valet", BooleanType(), True)
                                ]),
                                True)
                ]),
                True),
    StructField("categories", ArrayType(StringType()), True),
    StructField("hours", 
                StructType([
                    StructField("Monday", StringType(), True),
                    StructField("Tuesday", StringType(), True),
                    StructField("Wednesday", StringType(), True),
                    StructField("Thursday", StringType(), True),
                    StructField("Friday", StringType(), True),
                    StructField("Saturday", StringType(), True),
                    StructField("Sunday", StringType(), True)
                ]),
                True)
])

In [None]:
# Cargar el archivo Parquet en un DataFrame
joined_df_v1 = spark.read.schema(joined_schema).parquet("./joined_df_v1")
# Aplicar el filtro por estado (state) y año (year)
filtered_df = joined_df_v1.filter((joined_df_v1.state == "CA") & (joined_df_v1.year == "2016"))

In [None]:
# Cargar el archivo Parquet en un DataFrame
joined_df_v2 = spark.read.schema(joined_schema).parquet("./joined_df_v2")
# Aplicar el filtro por estado (state) y año (year)
filtered_df = joined_df_v2.filter((joined_df_v2.state == "CA") & (joined_df_v2.year == "2016"))