# ETL

In [None]:
import pandas as pd
import json
import re
import ast
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import matplotlib.pyplot as plt

In [None]:
nltk.download('vader_lexicon')

#### Funciones

In [None]:
def reemplazar_nan_con_none(cadena):
    resultado = ""
    indice = 0
    longitud = len(cadena)

    while indice < longitud:
        if cadena[indice:indice+3] == "NaN":
            resultado += "\"NaN\""
            indice += 3
        else:
            resultado += cadena[indice]
            indice += 1

    return resultado

In [None]:
def corregir_comillas(cadena):
    # Buscar comillas simples incorrectamente formateadas dentro de la cadena
    partes = cadena.split('"')
    for i in range(1, len(partes), 2):
        partes[i] = partes[i].replace("'", '"')
    # Volver a unir las partes corregidas
    cadena_corregida = '"'.join(partes)
    return cadena_corregida

In [None]:
def extraer_valores(cadena):
    valores = {}
    
    # Expresiones regulares para cada clave
    patrones = {
        'user_id': r'"user_id"\s*:\s*"([^"]+)"',
        'items_count': r'"items_count"\s*:\s*([^,]+)',
        'steam_id': r'"steam_id"\s*:\s*"([^"]+)"',
        'user_url': r'"user_url"\s*:\s*"([^"]+)"',
        'items': r'"items"\s*:\s*\{([^}]+)\}'
    }
    
    for clave, patron in patrones.items():
        coincidencias = re.search(patron, cadena)
        if coincidencias:
            valores[clave] = coincidencias.group(1)
    
    return valores

In [None]:
def procesar_diccionarios(items_parte):
    # Utiliza una expresión regular para encontrar todas las coincidencias de los diccionarios dentro de corchetes.
    diccionarios = re.findall(r'\{[^}]+\}', items_parte)

    # Inicializa una lista para almacenar los diccionarios procesados.
    resultado = []

    # Define una función para procesar cada diccionario.
    def procesar_diccionario(diccionario_str):
        # Utiliza una expresión regular para extraer los valores de las claves deseadas.
        item_id_match = re.search(r'"item_id": "([^"]+)"', diccionario_str)
        item_name_match = re.search(r'"item_name": "([^"]+)"', diccionario_str)
        playtime_forever_match = re.search(r'"playtime_forever": (\d+)', diccionario_str)
        playtime_2weeks_match = re.search(r'"playtime_2weeks": (\d+)', diccionario_str)

        # Verifica si se encontró una coincidencia para cada clave antes de extraer el valor.
        item_id = item_id_match.group(1) if item_id_match else None
        item_name = item_name_match.group(1) if item_name_match else None
        playtime_forever = int(playtime_forever_match.group(1)) if playtime_forever_match else None
        playtime_2weeks = int(playtime_2weeks_match.group(1)) if playtime_2weeks_match else None

        # Crea un diccionario con los valores extraídos.
        diccionario_resultado = {
            "item_id": item_id,
            "item_name": item_name,
            "playtime_forever": playtime_forever,
            "playtime_2weeks": playtime_2weeks
        }

        return diccionario_resultado

    # Procesa cada diccionario encontrado y agrégalo a la lista de resultados.
    for diccionario_str in diccionarios:
        resultado.append(procesar_diccionario(diccionario_str))

    # El resultado es una lista de diccionarios.
    return resultado


In [None]:
# Función para convertir fechas al formato "YYYY-MM-DD"
def convertir_fecha(fecha):
    # Utilizar expresión regular para extraer componentes de la fecha
    match = re.match(r"Posted (\w+) (\d+), (\d+)", fecha)
    if match:
        mes_str, dia_str, anio_str = match.groups()
        # Mapear nombres de meses a números
        meses = {
            'January': '01', 'February': '02', 'March': '03', 'April': '04',
            'May': '05', 'June': '06', 'July': '07', 'August': '08',
            'September': '09', 'October': '10', 'November': '11', 'December': '12'
        }
        # Crear la fecha en el nuevo formato
        nueva_fecha = f"{anio_str}-{meses[mes_str]}-{dia_str.zfill(2)}"
        return nueva_fecha
    else:
        return None

#### Cargar australian_user_reviews.json y convertirlo a parquet

In [None]:
with open('../dataset/australian_user_reviews.json', 'r', encoding='utf-8') as file:
    data_list = []
    for linea in file:
        linea = reemplazar_nan_con_none(linea)
        data = ast.literal_eval(linea.strip())
        if isinstance(data, dict):
            data_list.append(data)
    reviews = pd.DataFrame(data_list)

In [None]:
reviews.info()

In [None]:
reviews['user_id'].value_counts() #Validar duplicados

In [None]:
reviews = reviews.drop_duplicates(subset=['user_id']) #Eliminar duplicados

Función analisis de sentimiento

In [None]:
sia = SentimentIntensityAnalyzer()
def analyze_sentiment(lista):
    nlist =[]
    for i in lista:
        sentiment = sia.polarity_scores(i['review'])    
        try:               
            if sentiment['compound'] >= 0.05:
                i['sentiment_analysis'] = 2  # Positivo
                del i['review']
            elif sentiment['compound'] <= -0.05:
                i['sentiment_analysis'] = 0  # Malo
                del i['review']
            else:
                i['sentiment_analysis'] = 1  # Neutral
                del i['review']
        except:
            i['sentiment_analysis'] = 1
            del i['review']
        nlist.append(i)
    return nlist

In [None]:
reviews['reviews'] = reviews['reviews'].apply(lambda x: analyze_sentiment(x))   # Aplicando NLP

In [None]:
def desanidar_reviews(df):
    i = 0
    data_list = []
    while i <= len(df['user_id']) -1:
        user_id = df['user_id'].iloc[i]
        user_url = df['user_url'].iloc[i]
        lista = df['reviews'].iloc[i]
        for j in lista:
            j['user_id'] = user_id
            j['user_url'] = user_url
            data_list.append(j)
        i = i + 1
    return data_list

In [None]:
n_reviews = pd.DataFrame(desanidar_reviews(reviews))

In [None]:
# Aplicar la función de conversión a la columna y crear una nueva columna con las fechas reformateadas
n_reviews['posted_date'] = n_reviews['posted'].apply(convertir_fecha)
n_reviews['posted_date'] = pd.to_datetime(n_reviews['posted_date'])

In [None]:
n_reviews['posted_date'].describe()

Guarda DF en archivo parquet

In [None]:
n_reviews.to_parquet('../dataset/australian_user_reviews.parquet', engine='pyarrow', compression='snappy')

#### Cargar output_steam_games.json y convertirlo a parquet

In [None]:
with open('../dataset/output_steam_games.json', 'r', encoding='utf-8') as file:
    data_list = []
    for linea in file:
        linea = linea.replace('"NaN"', '')
        data = json.loads(linea.strip())
        if isinstance(data, dict):
            data_list.append(data)
    games = pd.DataFrame(data_list)
    games['price'] = games['price'].replace('Free To Play', 0)
    games['price'] = pd.to_numeric(games['price'], errors='coerce')
    games['metascore'] = pd.to_numeric(games['metascore'], errors='coerce')

In [None]:
games.info()

In [None]:
games['id'].value_counts()         # Validando registros duplicados

In [None]:
games = games.drop_duplicates(subset=['id'])

In [None]:
# Inputar cero a valores nulos de las columnas 'price' y 'discount_price'
games[['price','discount_price']] = games[['price','discount_price']].fillna(0) 

In [None]:
# Eliminar nulos para aquellos registros que tengan regitros nulo en las columnas 'title', 'app_name' e 'id
games = games.dropna(subset=['title', 'app_name','id'], how='all')

Guardar output_steam_games.parquet

In [None]:
games.to_parquet('../dataset/output_steam_games.parquet', engine='pyarrow', compression='snappy')

### Cargar australian_users_items.json y convertirlo a parquet

In [None]:
items = pd.DataFrame(columns=['user_id', 'items_count','steam_id','user_url','items'])
with open('../dataset/australian_users_items.json', 'r', encoding='utf-8') as file:
    data_list = []
    for linea in file:
        linea = linea.replace('"NaN"', '')
        linea = linea.replace('\'', '\"')
        resultado = re.search(r'"items":\s*(.+)', linea)
        linea = extraer_valores(linea)
        items_parte = resultado.group(1)
        items_parte = procesar_diccionarios(items_parte)
        linea['items'] = items_parte 
        data_list.append(linea)

items = pd.DataFrame(data_list)

In [None]:
items.info()

In [None]:
items['user_id'].value_counts()         #Validando registros duplicados

In [None]:
items = items.drop_duplicates(subset=['user_id'])       # Eliminar duplicados

In [None]:
items.iloc[0]

In [None]:
def desanidar_items(df):
    i = 0
    data_list = []
    while i <= len(df['user_id']) - 1:
        user_id = df['user_id'].iloc[i]
        steam_id = df['steam_id'].iloc[i]
        user_url = df['user_url'].iloc[i]
        lista = df['items'].iloc[i]
        for j in lista:
            j['user_id'] = user_id
            j['steam_id'] = steam_id
            j['user_url'] = user_url
            data_list.append(j)
        i = i + 1
    return data_list

In [None]:
n_items = pd.DataFrame(desanidar_items(items))

In [None]:
n_items.info()

Guardar australian_users_items.parquet

In [None]:
n_items.to_parquet('../dataset/australian_users_items.parquet', engine='pyarrow', compression='snappy')

Generar ranking_genre.parquet

In [None]:
# No ejecutar esta celda en render ya que se consume alrededor de 2 Gb de memoria y la versión gratis tiene solo 512 Mb
# Separar lista de generos
games_exploded = games.explode('genres')
# Combinar DF's games_explode con items coincidiendo por id    
merged_data = games_exploded.merge(items, left_on='id', right_on='item_id', how='inner')
# Sumar playtime_forever por género, este dataframe supera el gigabyte de memoria, por ello es necesario almacenar el df ranking_genre
genre_playtime = merged_data.groupby('genres')['playtime_forever'].sum().reset_index()
# Ordenar de mayor a menor sobre 'playtime_forever'
ranking_genre = genre_playtime.sort_values(by='playtime_forever', ascending=False)
ranking_genre = ranking_genre.reset_index(drop=True)
# Guardar df en formato parquet
ranking_genre.to_parquet('../dataset/ranking_genre.parquet', engine='pyarrow', compression='snappy')

In [None]:
# Crear df user_genre.parquet

# Creando df con las columnas 'genres', 'playtime_forever' y 'user_id_y'
usersXgenre = merged_data[['genres','playtime_forever','user_id_y','user_url']]
# Agrupando por usuario y genero
usersXgenre = usersXgenre.groupby(['genres','user_id_y','user_url'])['playtime_forever'].sum().reset_index()
# Guardar df en formato parquet
usersXgenre.to_parquet('../dataset/user_genre.parquet', engine='pyarrow', compression='snappy')