## 🛠️ **ETL (Extract, Transform, Load)**



#### **📂Procesamiento del 1er archivo: `steam_games.json.gz`**

####  **Importamos las librerías que vamos a usar**


In [1]:
#Es posible que tengas que instalar textblob
#%pip install textblob

In [2]:
"""
Este código importa varios módulos y funciones necesarios para el procesamiento de datos de juegos de Steam.

Importa:
- data_utils: un módulo personalizado que contiene funciones útiles para el procesamiento de datos.
- pandas: una biblioteca para el manejo y análisis de datos tabulares.
- ast: un módulo para interpretar expresiones Python.
- gzip: un módulo para descomprimir archivos comprimidos con gzip.
- json: un módulo para trabajar con datos en formato JSON.
- os: un módulo para interactuar con el sistema operativo.
- time: un módulo para trabajar con funciones relacionadas con el tiempo.
- pyarrow y pyarrow.parquet: bibliotecas para trabajar con formatos de datos columnares y eficientes como Parquet.
- warnings: un módulo para gestionar advertencias y filtrarlas si es necesario.

Además, importa varias funciones del módulo data_utils y habilita la recarga automática de módulos para facilitar el desarrollo.
"""
import datetime
import inspect
import re
import matplotlib.pyplot as plt
import inspect
import pandas as pd
import gzip
import os
import data_utils
import pandas as pd  # Pandas se utiliza para el manejo y análisis de datos tabulares
import ast  # AST (Abstract Syntax Trees) se utiliza para interpretar expresiones Python
import gzip
import json  # JSON se utiliza para trabajar con datos en formato JSON
import os  # OS proporciona funciones para interactuar con el sistema operativo
import time

import pyarrow as pa  # PyArrow se utiliza para trabajar con formatos de datos columnares y eficientes como Parquet
import pyarrow.parquet as pq

# Parquet se utiliza para trabajar con archivos Parquet
import warnings  # Warnings se utiliza para gestionar advertencias y filtrarlas si es necesario

warnings.filterwarnings("ignore")

from data_utils import (
    data_type_check,
    filtrar_valores_letras,
    duplicados_columna,
    descomprimir_archivos_gz,
)

# Autoreload se utiliza para recargar automáticamente los módulos al realizar cambios
%load_ext autoreload
%autoreload 2



#### 📦 **Extraccion** de los datos y primera exploración 

**Descomprimimos el archivos gz** 

In [3]:
"""
Este código realiza las siguientes tareas:

1. Obtiene el tiempo de inicio del notebook.
2. Descomprime un archivo JSON comprimido en formato GZ.
3. Lee el archivo JSON descomprimido línea por línea y crea una lista de objetos JSON.
4. Convierte la lista de objetos JSON en un DataFrame de Pandas.
5. Muestra 5 registros aleatorios del DataFrame.
"""

# Obtenemos el tiempo de inicio de todo este ipynb

start_time = time.time()


# Ejemplo de uso con una lista de archivos gz

archivo_gz_a_descomprimir = ["../0 Dataset/steam_games.json.gz"]

carpeta_destino = "../0 Dataset/"


descomprimir_archivos_gz(archivo_gz_a_descomprimir, carpeta_destino)


# Creamos una lista vacía llamada "rows" donde almacenaremos los datos del archivo JSON.


row = []

with open("../0 Dataset/steam_games.json", "r", encoding="utf-8") as archivo:

    for linea in archivo:

        try:

            objeto_json = json.loads(linea)

            row.append(objeto_json)

        except json.JSONDecodeError:

            print(f"Error de formato JSON en la línea: {linea}")


# Convertir la lista de objetos JSON en un DataFrame

df_steam_games = pd.DataFrame(row)

# Veamos unos registros al azar

df_steam_games.sample(5)

Archivo descomprimido: ../0 Dataset/steam_games.json


Unnamed: 0,publisher,genres,app_name,title,url,release_date,tags,reviews_url,specs,price,early_access,id,developer
83345,,,,,,,,,,,,,
16349,,,,,,,,,,,,,
41930,,,,,,,,,,,,,
41600,,,,,,,,,,,,,
69019,,,,,,,,,,,,,


#### Usando la funcion personalizada `data_type_check` invocada desde `data_utils.py` podemos observar:
- Variables categóricas
- Variables numéricas
- Dimensiones del dataframe
- Nulos
- Tipos de datos
- Informacion acerca de los datos faltantes o nulos de cada columna    


In [4]:
data_type_check(df_steam_games)


 Resumen del dataframe:

Dimensiones:  (120445, 13)
         columna  %_no_nulos  %_nulos  total_nulos tipo_dato
0      publisher       20.00    80.00        96362    object
1         genres       23.95    76.05        91593    object
2       app_name       26.68    73.32        88312    object
3          title       24.98    75.02        90360    object
4            url       26.68    73.32        88310    object
5   release_date       24.96    75.04        90377    object
6           tags       26.54    73.46        88473    object
7    reviews_url       26.68    73.32        88312    object
8          specs       26.12    73.88        88980    object
9          price       25.54    74.46        89687    object
10  early_access       26.68    73.32        88310    object
11            id       26.68    73.32        88312    object
12     developer       23.94    76.06        91609    object


#### 🔁 **TRANSFORM**

 ID

In [5]:
"""
Elimina los valores nulos y duplicados en la columna 'id' del DataFrame 'df_steam_games'.
Primero, elimina los valores nulos en la columna 'id' utilizando el método 'dropna()'.
Luego, cuenta el número de filas duplicadas en la columna 'id' utilizando el método 'duplicated()' y la función 'sum()'.
Imprime el número de duplicados encontrados.
Finalmente, elimina las filas duplicadas en la columna 'id' utilizando el método 'drop_duplicates()' y conserva la primera aparición de cada valor duplicado.
Verifica los tipos de datos de las columnas del DataFrame después de eliminar los valores nulos y duplicados.
"""

# Eliminamos los nulos y duplicados que hay en id

df_steam_games = df_steam_games.dropna(subset=["id"])


duplicados = df_steam_games.duplicated(subset="id").sum()

print(f"Número de duplicados: {duplicados}")

df_steam_games = df_steam_games.drop_duplicates(subset="id", keep="first")

data_type_check(df_steam_games)


print(f"Número de duplicados en 'id': {duplicados}")

Número de duplicados: 1

 Resumen del dataframe:

Dimensiones:  (32132, 13)
         columna  %_no_nulos  %_nulos  total_nulos tipo_dato
0      publisher       74.94    25.06         8051    object
1         genres       89.79    10.21         3282    object
2       app_name      100.00     0.00            1    object
3          title       93.62     6.38         2049    object
4            url      100.00     0.00            0    object
5   release_date       93.57     6.43         2066    object
6           tags       99.50     0.50          162    object
7    reviews_url      100.00     0.00            0    object
8          specs       97.92     2.08          669    object
9          price       95.71     4.29         1377    object
10  early_access      100.00     0.00            0    object
11            id      100.00     0.00            0    object
12     developer       89.74    10.26         3298    object
Número de duplicados en 'id': 1


#### **Columnas "publisher","title","early_access","reviews_url","specs"**

Eliminamos estas columnas ya que son irrelevantes para el problema que queremos resolver:

In [6]:
"""
Esta sección del código realiza las siguientes tareas:
1. Imprime las dimensiones del DataFrame df_steam_games antes de realizar la limpieza.
2. Elimina las columnas 'publisher', 'title', 'early_access', 'reviews_url' y 'specs' del DataFrame df_steam_games.
3. Imprime las dimensiones del DataFrame df_steam_games después de la limpieza.
"""

print("Dimensiones previas a limpieza:")
print(df_steam_games.shape)

# Eliminamos columnas que no vamos a usar
df_steam_games.drop(columns=["publisher","title","early_access","reviews_url","specs"], inplace=True)

print("Dimensiones post limpieza:")
print(df_steam_games.shape)


Dimensiones previas a limpieza:
(32132, 13)
Dimensiones post limpieza:
(32132, 8)


**Transformamos en lote las columnas que no necesitan tratamiento especial**

In [7]:
"""
Convierte los tipos de datos de las columnas 'app_name', 'tags' y 'developer' a cadenas de texto (string).
Esto es útil para manejar y procesar adecuadamente estos campos que contienen texto.
"""

#  Transformacion app_name, tags, developer a string.


df_steam_games = df_steam_games.astype(
    {"app_name": "string", "tags": "string", "developer": "string"}
)

#### **Developer**

In [8]:
# Reemplazamos los nulos con Dato Faltante
df_steam_games['developer'].fillna('Dato Faltante', inplace=True)
# Cambiar el tipo de dato a string
df_steam_games['developer'] = df_steam_games['developer'].astype('string')

In [9]:
df_steam_games['app_name'].fillna('Dato Faltante', inplace=True)

#### **Transformación de 'release_date'**

Se necesita extraer el año de lanzamiento del item, para ello se hace una nueva columna con el dato si existe o con un " XXXX " si no esta la fecha. Luego se elimina la columa 'release_date'.

- Cambiamos tipo de dato a str
- Se extrae el año de la fecha en la columna 'release_date' 
- Reemplazamos los datos sin fecha por 
- Llamamos a la funcion extraer_anio_release en data_utils.py  

In [10]:
# Aplica la función extraer_anio_release a la columna release_date
df_steam_games['release_year'] = df_steam_games['release_date'].apply(data_utils.extraer_anio_release)
# elimina la columna 'release_date'
df_steam_games = df_steam_games.drop('release_date', axis=1)
df_steam_games['release_year'] = df_steam_games['release_year'].astype(int)

AttributeError: module 'data_utils' has no attribute 'extraer_anio_release'

#### **price**
Necesitamos trabajar con esta columna, pero encontramos valores de texto para promociones o indicando que el juego es gratis. 

- Asignamos valores correspondientes a precios con textos adicional.
- Convierte la columna 'price' a tipo float.
- Rellena los valores nulos resultantes con -1.

Visualizamos los valores no numericos en precios a solucionar usando una funcion invocada desde data_utils.py

In [None]:
precios_texto = filtrar_valores_letras(df_steam_games['price'].unique())
print("Precios en forma de texto a corregir: ")
print(precios_texto)

Precios en forma de texto a corregir: 
['Free To Play', 'Free to Play', 'Free', 'Free Demo', 'Play for Free!', 'Install Now', 'Play WARMACHINE: Tactics Demo', 'Free Mod', 'Install Theme', 'Third-party', 'Play Now', 'Free HITMAN™ Holiday Pack', 'Play the Demo', 'Starting at $499.00', 'Starting at $449.00', 'Free to Try', 'Free Movie', 'Free to Use']


Filtramos los precios gratis a traves de buscar los datos faltantes en la columna tags

In [None]:
# Lista de palabras clave para buscar en las etiquetas
palabras_clave = ['Free to Play', 'Free', 'Free Demo', 'Play for Free!', 'Free Mod', 'Free to Try', 'Free Movie', 'Free to Use', 'Play the Demo', 'Free To Play']

# Mostrar valores únicos de 'price' antes de la limpieza
print("Muestra de valores no numéricos antes de la limpieza:")
precio_ironbound = df_steam_games[df_steam_games['app_name'] == 'Ironbound']['price'].unique()
print(precio_ironbound)

# Verificar si el precio del juego es nulo
if df_steam_games['price'].isnull().any():
    # Verificar si alguna de las palabras clave está en las etiquetas para cada fila y si falta el precio
    df_steam_games['precio_faltante'] = df_steam_games['price'].isna() & df_steam_games['tags'].apply(lambda x: any(palabra in x for palabra in palabras_clave) if isinstance(x, list) else False)

    # Reemplazar los valores de precio faltantes con 0 si la fila correspondiente tiene alguna palabra clave en las etiquetas
    df_steam_games.loc[df_steam_games['precio_faltante'], 'price'] = 0

    # Eliminar la columna temporal
    df_steam_games.drop('precio_faltante', axis=1, inplace=True)

# Limpiar valores no numéricos
non_numeric_values = ['Free', 'Free To Play', 'Free Demo', 'Play for Free!', 'Free Mod', 'Install Now', 'Play Now', 'Third-party', 'Play WARMACHINE: Tactics Demo', 'Install Theme', 'Starting at', 'Free to Try', 'Free Movie', 'Free to Use', 'Play the Demo']
df_steam_games['price'] = df_steam_games['price'].apply(lambda x: 0 if x in non_numeric_values else x)

# Convertir la columna 'price' a tipo string para limpiar valores con texto adicional
df_steam_games['price'] = df_steam_games['price'].astype(str)

# Limpiar valores con texto adicional y convertir la columna 'price' a tipo float
df_steam_games['price'] = df_steam_games['price'].replace(to_replace=r'[^0-9.]', value='', regex=True)
df_steam_games['price'] = pd.to_numeric(df_steam_games['price'], errors='coerce')

# Verificar si hay valores nulos en la columna 'price'
if df_steam_games['price'].isnull().any():
    # Asignar -1 a los valores nulos en la columna 'price'
    df_steam_games['price'].fillna(0, inplace=True)

# Mostrar valores únicos de 'price' después de la limpieza
print("Muestra de valores no numéricos después de la limpieza:")
precio_ironbound = df_steam_games[df_steam_games['app_name'] == 'Ironbound']['price'].unique()
print(precio_ironbound)

data_type_check(df_steam_games)


Muestra de valores no numéricos antes de la limpieza:
['Free To Play']
Muestra de valores no numéricos después de la limpieza:
[0.]

 Resumen del dataframe:

Dimensiones:  (32132, 8)
        columna  %_no_nulos  %_nulos  total_nulos       tipo_dato
0        genres       89.79    10.21         3282          object
1      app_name      100.00     0.00            0  string[python]
2           url      100.00     0.00            0          object
3          tags       99.50     0.50          162  string[python]
4         price      100.00     0.00            0         float64
5            id      100.00     0.00            0          object
6     developer      100.00     0.00            0  string[python]
7  release_year      100.00     0.00            0           int32


#### **genres**:

- Rellenamos la informacion faltante de genre, con los genre disponibles en la columna tags



In [None]:
print("Valores faltantes antes de el relleno de datos: ", df_steam_games['genres'].isnull().sum())

Valores faltantes antes de el relleno de datos:  3282


In [None]:
"""
Esta función rellena los valores faltantes en la columna 'genres' de un DataFrame de juegos de Steam utilizando la información de la columna 'tags'.

1. Se define un conjunto de géneros válidos a copiar.
2. Se define una función 'rellenar_genres' que toma las etiquetas (tags) y los géneros existentes (genres) como entrada.
3. Dentro de la función:
   a. Se convierte la cadena de etiquetas (tags) en una lista.
   b. Se filtran los géneros válidos presentes en las etiquetas.
   c. Se eliminan los géneros no deseados ('Free to Play' y 'Early Access').
   d. Si no hay nuevos géneros, se devuelven los géneros originales.
   e. Si hay nuevos géneros, se devuelven como una lista.
4. Se aplica la función 'rellenar_genres' a cada fila del DataFrame utilizando el método 'apply'.
5. Se elimina la columna 'tags' del DataFrame.
6. Se convierte la columna 'genres' a tipo de dato string.
7. Se imprime la cantidad de valores faltantes en la columna 'genres' después del relleno.
"""

# Rellenamos la informacion faltante de genre, con los genre disponibles en la columna tags

print("Valores faltantes previos: ", df_steam_games["genres"].isnull().sum())


# Conjunto de géneros a copiar

generos_a_copiar = {
    "Simulation",
    "Adventure",
    "Strategy",
    "Education",
    "Photo Editing",
    "Massively Multiplayer",
    "Accounting",
    "Video Production",
    "Design & Illustration",
    "Racing",
    "Web Publishing",
    "Utilities",
    "Software Training",
    "Sports",
    "Action",
    "Indie",
    "Audio Production",
    "Animation & Modeling",
    "Casual",
    "RPG",
}



# Función para rellenar genres desde tags

def rellenar_genres(tags, genres):

    try:

        # Convertir las etiquetas (tags) a una lista

        tags_list = ast.literal_eval(tags)

        # Filtrar los géneros a copiar

        nuevos_generos = [genre for genre in generos_a_copiar if genre in tags_list]

        # Eliminar 'Free to Play' y 'Early Access'

        no_deseados = ["Free to Play", "Early Access"]

        nuevos_generos = [genre for genre in nuevos_generos if genre not in no_deseados]

        # Si no hay nuevos géneros, devolver los originales

        if not nuevos_generos:
            return genres

        # Devolver los nuevos géneros como lista
        return nuevos_generos


    except (ValueError, SyntaxError):

        # Manejar errores de evaluación

        pass


    # Si hay algún error, o la evaluación no es una lista, devolver los géneros originales como lista
    return genres



# Aplicar la función para rellenar genres desde tags

df_steam_games["genres"] = df_steam_games.apply(
    lambda row: rellenar_genres(row["tags"], row["genres"]), axis=1
)


# Eliminar tags
df_steam_games = df_steam_games.drop("tags", axis=1)


df_steam_games["genres"] = df_steam_games["genres"].astype(str)


# Observamos la cantidad de nulos

print(
    "Valores faltantes luego de el relleno de datos: ",
    df_steam_games["genres"].isnull().sum(),
)

Valores faltantes previos:  3282
Valores faltantes luego de el relleno de datos:  0


In [None]:
"""
Elimina las filas del DataFrame df_steam_games donde la columna 'genres' tiene valores nulos (NaN).
Luego, imprime la cantidad de valores nulos que quedan en la columna 'genres' después de eliminar las filas con nulos.
"""

df_steam_games = df_steam_games.dropna(subset=["genres"])


# Observamos la cantidad de nulos

print("Valores faltantes finale: ", df_steam_games["genres"].isnull().sum())

Valores faltantes finale:  0


In [None]:
#Dejamos informacion de muestra acerca de ese archivo
data_type_check(df_steam_games)


 Resumen del dataframe:

Dimensiones:  (32132, 7)
        columna  %_no_nulos  %_nulos  total_nulos       tipo_dato
0        genres       100.0      0.0            0          object
1      app_name       100.0      0.0            0  string[python]
2           url       100.0      0.0            0          object
3         price       100.0      0.0            0         float64
4            id       100.0      0.0            0          object
5     developer       100.0      0.0            0  string[python]
6  release_year       100.0      0.0            0           int32


### ID

In [None]:
df_steam_games['id'] = df_steam_games['id'].astype('int64')

#### **📤 LOAD**

In [None]:
"""
Guarda el DataFrame df_steam_games en formato parquet en la ruta especificada.

Args:
    df_steam_games (pandas.DataFrame): DataFrame que contiene los datos de los juegos de Steam.
    ruta_parquet (str): Ruta donde se guardará el archivo parquet.
"""


table = pa.Table.from_pandas(df_steam_games)

ruta_parquet = os.path.join("..", "0 Dataset", "1.1_steam_games_LISTO.parquet")
df_steam_games.to_parquet(ruta_parquet)
print(f"Se guardó el archivo {ruta_parquet}")

Se guardó el archivo ..\0 Dataset\1.1_steam_games_LISTO.parquet


## **Terminamos**
  - Eliminamos el archivo descomprimidos que ahora tenemos limpio y liviano en formato parquet

In [None]:
"""
Elimina los archivos JSON descomprimidos de la lista 'gz_descomprimido'.
Itera sobre la lista de archivos y utiliza el método 'os.remove()' para eliminar cada archivo.
Si el archivo no se encuentra, imprime un mensaje indicando que el archivo no se encontró.
"""

# Lista de archivos descomprimidos
gz_descomprimido = ["../0 Dataset/steam_games.json"]

# Eliminar archivos descomprimidos
for archivo_json in gz_descomprimido:
    try:
        os.remove(archivo_json)
        print(f"Archivo eliminado: {archivo_json}")
    except FileNotFoundError:
        print(f"Archivo no encontrado: {archivo_json}")

Archivo eliminado: ../0 Dataset/steam_games.json


Observamos el tiempo de ejecucion total de nuestro proceso ETL 🔥

In [None]:
"""
Calcula el tiempo total de ejecución del script en minutos.

Obtiene el tiempo de finalización utilizando la función time.time().
Calcula el tiempo total de ejecución restando el tiempo de inicio (start_time) del tiempo de finalización.
Convierte el tiempo total de ejecución de segundos a minutos.
Redondea el tiempo total de ejecución a 2 decimales.
Imprime el tiempo total de ejecución en minutos.
"""

# Obtener el tiempo de finalización

end_time = time.time()

# Calcular el tiempo total de ejecución

total_time = end_time - start_time

# Convertir a minutos y redondear a 2 decimales

total_time_minutes = round(total_time / 60, 2)

# Imprimir resultados

print(f"Tiempo total de ejecución de este ipynb: {total_time_minutes} minutos")

Tiempo total de ejecución de este ipynb: 0.28 minutos
