<a href="https://colab.research.google.com/github/Insights-Labs-Consultant-Agency/yelp-google-maps-reviews-and-recommendations/blob/data-pipeline/4.6-fp-etl-glue-job-yelp-user.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Extracción, transformación y carga (ETL)

En este notebook, nuestro objetivo es realizar el proceso de extracción, transformación y carga (ETL) de los datos de Yelp y Google Maps utilizando la librería AWS Glue 3.0 que servirá como base para los scripts de los diferentes ETL Glue Jobs que se usaran en AWS Glue Workflow en el proceso de carga al DW. En esta etapa, se realizará un proceso de limpieza previa y posterior normalización para construir un DER.

## 0 Configuraciones Globales e Importaciones

En esta sección,instalamos e importamos todas las librerías y/o módulos necesarios para nuestro proceso ETL y establecemos configuraciones globales de ser requerido.

### Instalación de librerías y/o Dependencias

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://aws-glue-etl-artifacts.s3.amazonaws.com/glue-common/apache-maven-3.6.0-bin.tar.gz
!tar xvf apache-maven-3.6.0-bin.tar.gz -C /bin/ > /dev/null
!wget -q https://aws-glue-etl-artifacts.s3.amazonaws.com/glue-3.0/spark-3.1.1-amzn-0-bin-3.2.1-amzn-3.tgz
!tar xvf spark-3.1.1-amzn-0-bin-3.2.1-amzn-3.tgz -C /bin/ > /dev/null
!pip install -q findspark

### Exportación de Variables Entorno

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] += ":/bin/apache-maven-3.6.0/bin"
os.environ["SPARK_HOME"] = "/bin/spark-3.1.1-amzn-0-bin-3.2.1-amzn-3/"
os.environ["SPARK_CONF_DIR"] = "/bin/aws-glue-libs/conf"

### Instalación de AWS GLue 3.0 Libs

In [None]:
!git clone -b glue-3.0 https://github.com/awslabs/aws-glue-libs.git /bin/aws-glue-libs
!chmod +x /bin/aws-glue-libs/bin/glue-setup.sh
!bash /bin/aws-glue-libs/bin/glue-setup.sh > /dev/null
!cp -r /bin/spark-3.1.1-amzn-0-bin-3.2.1-amzn-3/jars/netty-all-4.1.51.Final.jar /bin/aws-glue-libs/jarsv1/

Cloning into '/bin/aws-glue-libs'...
remote: Enumerating objects: 321, done.[K
remote: Counting objects: 100% (99/99), done.[K
remote: Compressing objects: 100% (54/54), done.[K
remote: Total 321 (delta 63), reused 62 (delta 45), pack-reused 222[K
Receiving objects: 100% (321/321), 163.93 KiB | 14.90 MiB/s, done.
Resolving deltas: 100% (203/203), done.
rm: cannot remove 'PyGlue.zip': No such file or directory
rm: cannot remove '/bin/aws-glue-libs/conf/spark-defaults.conf': No such file or directory


### Importación de Librerías y/o Módulos

In [None]:
import sys
sys.path.extend(["/bin/spark-3.1.1-amzn-0-bin-3.2.1-amzn-3/python","/bin/spark-3.1.1-amzn-0-bin-3.2.1-amzn-3/python/lib/py4j-0.10.9-src.zip","/bin/aws-glue-libs/PyGlue.zip"])

import findspark
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import when, col, size, split, explode, monotonically_increasing_id

findspark.init()
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark

## 1 Extracción

En esta sección, extraemos los datasets de la fuente y los leemos como un DataFrame de PySpark.

In [None]:
# Ruta al archivo parquet user
file_path = '/content/drive/MyDrive/data/raw/yelp/user.parquet'

In [None]:
# Lee el archivo parquet
df = spark.read.parquet(file_path)

## 2 Transformación

En esta sección, realizamos la limpieza inicial de los datos y las transformaciones necesarias. Esto puede incluir la creación de nuevas columnas  la eliminación de duplicados o columnas innecesarias, la gestión de valores nulos o la corrección de tipos de datos.

In [None]:
# Elimina las filas duplicadas
df = df.dropDuplicates()

In [None]:
# Convierte la columna 'yelping_since' al formato datetime
df = df.withColumn('yelping_since', df['yelping_since'].cast('timestamp'))

In [None]:
# Ordena el DataFrame por y 'yelping_since'
df = df.orderBy('yelping_since')

In [None]:
# Crea un nuevo DataFrame para la normalización de la columna 'elite'
df_elite = df.withColumn('elite', split(df['elite'], ','))
df_elite = df_elite.withColumn('elite', explode(df_elite['elite']))
df_elite = df_elite.select('user_id', df_elite['elite'].alias('elite_year'))

In [None]:
# Reemplaza los strings en blanco por None
df_elite = df_elite.withColumn('elite_year', when(col('elite_year') == '', None).otherwise(col('elite_year')))

In [None]:
# Elimina los None
df_elite = df_elite.na.drop(subset=["elite_year"])

In [None]:
# Ordena el DataFrame por 'elite_year'
df_elite = df_elite.orderBy('elite_year')

In [None]:
# Filtra los años que no son '20'
df_elite = df_elite.filter(df_elite.elite_year != '20')

In [None]:
# Elimina la columna 'elite'
df = df.drop('elite')

In [None]:
# Crea una nueva columna 'friends' que es la cantidad de amigos
df = df.withColumn('friends', size(split(df['friends'], ',')))

In [None]:
df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- yelping_since: timestamp (nullable = true)
 |-- useful: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- friends: integer (nullable = false)
 |-- fans: long (nullable = true)
 |-- average_stars: double (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- compliment_photos: long (nullable = true)



In [None]:
df_elite.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- elite_year: string (nullable = true)



## 3. Carga

Finalmente, en esta sección cargamos nuestros datos transformados en formato parquet a su destino correspondiente.

### Google Drive

In [None]:
# Ruta al archivo Parquet local
file_path = '/content/drive/MyDrive/data/cleaned/yelp/user.parquet'

# Escribe el DataFrame a un archivo Parquet localmente
df.write.parquet(file_path)


In [None]:
# Ruta al archivo Parquet local
file_path = '/content/drive/MyDrive/data/cleaned/yelp/elite.parquet'

# Escribe el DataFrame a un archivo Parquet localmente
df_elite.write.parquet(file_path)

### S3

In [None]:
# Convierte el DataFrame de Spark a un DynamicFrame de Glue
dyf = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")

In [None]:
# Escribe el DynamicFrame a S3 en formato Parquet
glueContext.write_dynamic_frame.from_options(
    frame = dyf,
    connection_type = "s3",
    connection_options = {"path": "s3://ruta/al/bucket"},
    format = "parquet"
)