![banner_etl](https://github.com/cistelsa/Commerce_Data_Analysis_and_Recommendations/blob/main/5_Sources/Images/banner_automatizacion.gif?raw=true)

**<mark style="background:#2bfe9c">Script de automatización</mark> proviene del Notebook <mark>Pre ETL Yelp</mark>**

In [1]:
# Librería Pandas
import pandas as pd
# Se importan funciones especificas de spark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType


StatementMeta(, , , Waiting, )

## **`Lectura y procesos con Apache Spark`**

In [2]:

# Define manualmente el esquema del JSON
json_schema = StructType([
    StructField("review_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("business_id", StringType(), True),
    StructField("stars", IntegerType(), True),
    StructField("useful", IntegerType(), True),
    StructField("funny", IntegerType(), True),
    StructField("cool", IntegerType(), True),
    StructField("text", StringType(), True),
    StructField("date", TimestampType(), True)
])

StatementMeta(, , , Waiting, )

In [3]:
# Lee el archivo JSON por líneas y aplica el esquema
df_review = spark.read.schema(json_schema).json("Files/data/original/Yelp/review.json.gz")

StatementMeta(, 76d8dc6a-98c0-40b5-b676-4bc1ae12c5d8, 5, Finished, Available)

### Dataset `business.csv`
#### Realizamos lectura del dataset principal el cual nos permitirá filtrar los datos por la columna `business_id`

In [34]:
# Oficial Lectura del dataset con Spark
df_business = spark.read.format("csv").option("header","true").load("Files/data/original/Yelp/business.csv")

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 36, Finished, Available)

SynapseWidget(Synapse.DataFrame, e10b2d31-9feb-4062-9153-a8b2520c75b2)

#### Filtro por medio de `Inner Join` sin llamar columnas del segundo dataframe

In [60]:
df_review = df_review.join(df_business.select("business_id"), on="business_id", how="inner")

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 62, Finished, Available)

### Hacemos escritura del nuevo dataset en formato `Parquet` muy conveniente para el ahorro de recursos por su compresión y para las consultas columnares en el momento de hacer los modelos de machine learning.

In [61]:
# Guardar el DataFrame en un solo archivo Parquet
# Opcional: Sobrescribir si el archivo ya existe
df_review.coalesce(1).write \
    .format("parquet") \
    .mode("overwrite") \
    .save("Files/data/beta/Yelp/review.parquet")

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 63, Finished, Available)

## Lectura dataset tip.json

In [43]:
# Define manualmente el esquema del JSON
json_schema_tip = StructType([
    StructField("user_id", StringType(), True),
    StructField("business_id", StringType(), True),
    StructField("text", StringType(), True),
    StructField("date", TimestampType(), True),
    StructField("compliment_count", IntegerType(), True),
])

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 45, Finished, Available)

In [44]:
# Lee el archivo JSON por líneas y aplica el esquema, lectura directa de archivo comprimido con GZIP
df_tip = spark.read.schema(json_schema_tip).json("Files/data/original/Yelp/tip.json.gz")

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 46, Finished, Available)

In [47]:
# Inner Join para filtrar los datos
df_tip = df_tip.join(df_business.select("business_id"), on="business_id", how="inner")

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 49, Finished, Available)

#### Al ser un dataframe no complejo y liviano, sin datos anidados procedemos a convertir a pandas y su escritura en formato `.CSV` esto es por experiencia que pandas le da mejor manejo a este sistema de archivo que spark que es más especialista en formatos columnares, comprimidos y que manejan nodos.

In [20]:
# Convertir a DF Pandas
df_tip_p = df_tip.toPandas()

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 22, Finished, Available)

In [21]:
# Guardar Datframe en archivo CSV
df_tip_p.to_csv("/lakehouse/default/Files/data/beta/Yelp/tip.csv", index=False)

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 23, Finished, Available)

## Lectura checkin.json

In [22]:
# Define manualmente el esquema del JSON
json_schema_checkin = StructType([
    StructField("business_id", StringType(), True),
    StructField("date", StringType(), True)
])

StatementMeta(, , , Waiting, )

In [23]:
# Lee el archivo JSON por líneas y aplica el esquema
df_checkin = spark.read.schema(json_schema_checkin).json("Files/data/original/Yelp/checkin.json.gz")

StatementMeta(, , , Waiting, )

In [25]:
# Inner Join para filtrar los datos
df_checkin = df_checkin.join(df_business.select("business_id"), on="business_id", how="inner")

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 27, Finished, Available)

In [27]:
# Convertir a DF Pandas
df_checkin_p = df_checkin.toPandas()

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 29, Finished, Available)

In [28]:
# Guardar Datframe en archivo CSV
df_checkin_p.to_csv("/lakehouse/default/Files/data/beta/Yelp/checkin.csv", index=False)

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 30, Finished, Available)

## Lectura user.parquet

In [1]:
# Leer el archivo parquet
df_user = spark.read.parquet("Files/data/original/Yelp/user.parquet")

StatementMeta(, 72d222ca-bd12-482e-997c-1f09c3aace04, 3, Finished, Available)

#### Por lógica debemos extraer los `user_id` de los dataframe que contienen esta info es el caso de `df_review` y `df_tip` hacemos un select a la columna `user_id` y unimos los dos dataframe, eliminando los registros repetidos para hacer un buen filtro sin perdida de recursos.

In [50]:
df_review_uid = df_review.select("user_id").union(df_tip.select("user_id")).dropDuplicates()

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 52, Finished, Available)

In [51]:
# Inner Join para filtrar los datos
df_user = df_user.join(df_review_uid.select("user_id"), on="user_id", how="inner")

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 53, Finished, Available)

In [54]:
# Guardar el DataFrame en un solo archivo Parquet
# Opcional: Sobrescribir si el archivo ya existe
df_user.coalesce(1).write \
    .format("parquet") \
    .mode("overwrite") \
    .save("Files/data/beta/Yelp/user.parquet")

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 56, Finished, Available)

In [62]:
# Nueva lectura del dataset business.csv a través de pandas para hacer una corrección y guardarlo nuevamente
df_business_p = pd.read_csv("/lakehouse/default/" + "Files/data/original/Yelp/business.csv")

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 64, Finished, Available)

In [64]:
# Eliminamos la primera columna con título vacío
df_business_p = df_business_p.drop(columns=["Unnamed: 0"])

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 66, Finished, Available)

In [66]:
# Guardar Datframe en archivo CSV
df_business_p.to_csv("/lakehouse/default/Files/data/beta/Yelp/business.csv", index=False)

StatementMeta(, d1b817e4-d92f-4fd3-be65-2658512cdc8e, 68, Finished, Available)