<H1>Extracción, transformación y carga (ETL)

En este Notebook se realizará el proceso de ETL para los datasets proporcionados.

<h2>Importaciones

In [2]:
import pandas as pd
import json
import ast
import pyarrow

<h2>Steam Games

Se carga el dataset original y se crea el DataFrame correspondiente.

In [2]:
with open('../data/output_steam_games.json', 'r') as file:
    lines = file.readlines()

steamGamesData = [json.loads(line) for line in lines]

steamGames = pd.json_normalize(steamGamesData)

steamGamesDf = pd.DataFrame(steamGames)

 Este dataset contiene información acerca de los juegos tal como su nombre, género, desarrollador, fecha de salida, etc.

In [3]:
steamGamesDf.head()

Unnamed: 0,publisher,genres,app_name,title,url,release_date,tags,reviews_url,specs,price,early_access,id,developer
0,,,,,,,,,,,,,
1,,,,,,,,,,,,,
2,,,,,,,,,,,,,
3,,,,,,,,,,,,,
4,,,,,,,,,,,,,


Se eliminan aquellas filas que se encuentren vacías en su totalidad. Después se transforma la fecha de salida a formato de fecha, se extrae el año de salida en una columna nueva, y se cambia el nombre de la columna 'id' para que coincida con el resto de datasets.

In [4]:
steamGamesDf = steamGamesDf.dropna(how='all')
steamGamesDf['release_date'] = pd.to_datetime(steamGamesDf['release_date'],format = 'mixed',errors = 'coerce')
steamGamesDf['release_year'] = steamGamesDf['release_date'].dt.year.fillna(0).astype(int)
steamGamesDf.rename(columns = {'id': 'item_id'}, inplace = True)

In [5]:
steamGamesDf.head()[:1]

Unnamed: 0,publisher,genres,app_name,title,url,release_date,tags,reviews_url,specs,price,early_access,item_id,developer,release_year
88310,Kotoshiro,"[Action, Casual, Indie, Simulation, Strategy]",Lost Summoner Kitty,Lost Summoner Kitty,http://store.steampowered.com/app/761140/Lost_...,2018-01-04,"[Strategy, Action, Indie, Casual, Simulation]",http://steamcommunity.com/app/761140/reviews/?...,[Single-player],4.99,False,761140,Kotoshiro,2018


Parte de las funciones que se nos pide realizar necesitan acceso al dato del precio y si el juego es gratis o no. Podemos ver que la columna 'price' cuenta con valores tipo string y float.

In [6]:
steamGamesDf['price'].apply(type).unique()

array([<class 'float'>, <class 'str'>], dtype=object)

Procedemos a listar los valores únicos de tipo string.

In [7]:
steamGamesDf['price'].loc[steamGamesDf['price'].apply(type) == str].unique()

array(['Free To Play', 'Free to Play', 'Free', 'Free Demo',
       'Play for Free!', 'Install Now', 'Play WARMACHINE: Tactics Demo',
       'Free Mod', 'Install Theme', 'Third-party', 'Play Now',
       'Free HITMAN™ Holiday Pack', 'Play the Demo',
       'Starting at $499.00', 'Starting at $449.00', 'Free to Try',
       'Free Movie', 'Free to Use'], dtype=object)

Se realiza la eliminación de dichos valores sustituyendo aquellos gratuitos por 0 y lo que mencionan el precio por el precio dado.

In [8]:
steamGamesDf.loc[steamGamesDf['price'].str.contains('Free', case = False) & steamGamesDf['price'].notna(), 'price'] = 0.0
steamGamesDf.loc[steamGamesDf['price'].str.contains('install', case = False) & steamGamesDf['price'].notna(), 'price'] = 0.0
steamGamesDf.loc[steamGamesDf['price'].str.contains('play', case = False) & steamGamesDf['price'].notna(), 'price'] = 0.0
steamGamesDf.loc[steamGamesDf['price'].str.contains('third', case = False) & steamGamesDf['price'].notna(), 'price'] = 0.0
steamGamesDf.loc[steamGamesDf['price'].str.contains('449', case = False) & steamGamesDf['price'].notna(), 'price'] = 449.0
steamGamesDf.loc[steamGamesDf['price'].str.contains('499', case = False) & steamGamesDf['price'].notna(), 'price'] = 499.0

Verificamos que se hayan eliminado los strings.

In [9]:
steamGamesDf['price'].apply(type).unique()

array([<class 'float'>], dtype=object)

Transformamos el dataset a formato parquet.

In [10]:
steamGamesDf.to_parquet('../data/steamGames.parquet',index = False)

<H3>Steam Games Price

Para ahorrar espacio en la API creamos un dataframe que contenga únicamente el ID de los juegos y su precio. Como lo que nos interesa de este dataset es el precio de los ítems, se eliminan aquellos juegos que no tengan listado su precio, y lo transformamos igualmente a parquet.

In [11]:
steamGamesPriceDf = steamGamesDf[['item_id','price']]
steamGamesPriceDf = steamGamesPriceDf.dropna(subset='price')
steamGamesPriceDf.head()

Unnamed: 0,item_id,price
88310,761140,4.99
88311,643980,0.0
88312,670290,0.0
88313,767400,0.99
88314,773570,2.99


In [12]:
steamGamesPriceDf.to_parquet('../data/steamGamesPrice.parquet',index = False)

<H3>Steam Games Genres

También necesitaremos un dataframe que contenga los géneros de los juegos, por lo que se crea uno a partir de steamGamesDf que contenga las columnas 'item_id', 'genres' y 'release_year'. Como lo que nos interesa de este dataset es el género de los ítems, se eliminan aquellos juegos que no lo tengan listado. Se realiza una pequeña corrección en el formato cambiando &amp; por 'and' y se transforma a parquet.

In [13]:
steamGamesGenresDf = steamGamesDf[['item_id','genres','release_year']]
steamGamesGenresDf = steamGamesGenresDf.dropna(subset = 'genres')
steamGamesGenresDf['genres'] = steamGamesGenresDf['genres'].apply(lambda x: [element.replace('&amp;', 'and') for element in x]) 
steamGamesGenresDf.head()

Unnamed: 0,item_id,genres,release_year
88310,761140,"[Action, Casual, Indie, Simulation, Strategy]",2018
88311,643980,"[Free to Play, Indie, RPG, Strategy]",2018
88312,670290,"[Casual, Free to Play, Indie, Simulation, Sports]",2017
88313,767400,"[Action, Adventure, Casual]",2017
88315,772540,"[Action, Adventure, Simulation]",2018


In [14]:
steamGamesGenresDf.to_parquet('../data/steamGamesGenres.parquet',index = False)

<H4>Steam Games Genres Exploded

Ya que el dataset de steamGamesGenres cuenta con los datos de género anidados en una lista, se utiliza la función 'exploded' para crear una fila por cada combinación de ID y género único. Después se transforma a parquet y se guarda.

In [15]:
steamGamesGenresExplodedDf = steamGamesGenresDf.explode('genres').reset_index(drop=True)
steamGamesGenresExplodedDf.head()

Unnamed: 0,item_id,genres,release_year
0,761140,Action,2018
1,761140,Casual,2018
2,761140,Indie,2018
3,761140,Simulation,2018
4,761140,Strategy,2018


In [16]:
steamGamesGenresExplodedDf.to_parquet('../data/steamGamesGenresExploded.parquet',index = False)

<H3>Steam Games Dev

Necesitaremos un dataframe que contenga los desarrolladores de los juegos, por lo que se siguen los pasos del dataset anterior y se crea uno que contenga las columnas 'item_id', 'developer' y 'release_year'. Como lo que nos interesa de este dataset es el developer de los ítems, se eliminan aquellos juegos que no lo tengan listado y se transforma a parquet.

In [17]:
steamGamesDevDf = steamGamesDf[['item_id','developer','release_year']]
steamGamesDevDf = steamGamesDevDf.dropna(subset = 'developer')
steamGamesDevDf.head()

Unnamed: 0,item_id,developer,release_year
88310,761140,Kotoshiro,2018
88311,643980,Secret Level SRL,2018
88312,670290,Poolians.com,2017
88313,767400,彼岸领域,2017
88315,772540,Trickjump Games Ltd,2018


In [18]:
steamGamesDevDf.to_parquet('../data/steamGamesDev.parquet',index = False)

<H2>User Items

Se carga el dataset original y se crea el dataframe correspondiente. Este es un dataset más grande por lo que toma más tiempo el proceso de carga y transformación.

In [5]:
dataList = []
filePath = '../data/australian_users_items.json'

#Abrir el archivo y procesar cada línea
with open(filePath, 'r', encoding = 'utf-8') as file:
    for line in file:
        try:
            # Usar ast.literal_eval para convertir la línea en un diccionario
            jsonData = ast.literal_eval(line)
            dataList.append(jsonData)
        except ValueError as e:
            print(f"Error en la línea: {line}")
        continue

#Crear un DataFrame a partir de la lista de diccionarios
userItemsDf = pd.DataFrame(dataList)

In [6]:
userItemsDf.head()

Unnamed: 0,user_id,items_count,steam_id,user_url,items
0,76561197970982479,277,76561197970982479,http://steamcommunity.com/profiles/76561197970...,"[{'item_id': '10', 'item_name': 'Counter-Strik..."
1,js41637,888,76561198035864385,http://steamcommunity.com/id/js41637,"[{'item_id': '10', 'item_name': 'Counter-Strik..."
2,evcentric,137,76561198007712555,http://steamcommunity.com/id/evcentric,"[{'item_id': '1200', 'item_name': 'Red Orchest..."
3,Riot-Punch,328,76561197963445855,http://steamcommunity.com/id/Riot-Punch,"[{'item_id': '10', 'item_name': 'Counter-Strik..."
4,doctr,541,76561198002099482,http://steamcommunity.com/id/doctr,"[{'item_id': '300', 'item_name': 'Day of Defea..."


Se guarda el dato de 'items_count' en un nuevo dataframe.

In [7]:
userItemCountDf = userItemsDf[['user_id', 'items_count']]

Eliminamos las columans que no usaremos de ahora en adelante, y convertimos ambos a parquet.

In [8]:
del userItemsDf['steam_id']
del userItemsDf['items_count']
del userItemsDf['user_url']

In [9]:
userItemsDf.to_parquet('../data/userItems.parquet',index = False)
userItemCountDf.to_parquet('../data/userItemCount.parquet',index = False)

<H3>User Items Exploded

Ya que el dataframe de userItems cuenta con los datos de los ítems de cada usuario anidados en un diccionario, se utiliza la función 'exploded' para crear una fila por cada juego.

In [10]:
userItemsExplodedDf = userItemsDf.explode('items').reset_index(drop = True)

Se extraen los datos de 'item_id' y 'playtime_forever' que son los que usaremos para las funciones y se borra la columna de 'items' para reducir el tamaño. Transformamos 'playtime' de minutos a horas y guardamos el dataframe como parquet.

In [11]:
userItemsExplodedDf['item_id'] = userItemsExplodedDf['items'].apply(lambda x: x.get('item_id') if isinstance(x, dict) and 'item_id' in x else None)
userItemsExplodedDf['playtime'] = userItemsExplodedDf['items'].apply(lambda x: x.get('playtime_forever') if isinstance(x, dict) and 'playtime_forever' in x else None)
userItemsExplodedDf['playtime'] = (userItemsExplodedDf['playtime'] / 60).round(2)

In [12]:
userItemsExplodedDf.head()

Unnamed: 0,user_id,items,item_id,playtime
0,76561197970982479,"{'item_id': '10', 'item_name': 'Counter-Strike...",10,0.1
1,76561197970982479,"{'item_id': '20', 'item_name': 'Team Fortress ...",20,0.0
2,76561197970982479,"{'item_id': '30', 'item_name': 'Day of Defeat'...",30,0.12
3,76561197970982479,"{'item_id': '40', 'item_name': 'Deathmatch Cla...",40,0.0
4,76561197970982479,"{'item_id': '50', 'item_name': 'Half-Life: Opp...",50,0.0


In [13]:
del userItemsExplodedDf['items']

In [14]:
userItemsExplodedDf.to_parquet('../data/userItemsExploded.parquet',index = False)

<H2>User Reviews

Por último cargamos el dataset correspondiente a las reseñas de los usuarios y creamos su dataframe.

In [3]:
#Ruta del archivo JSON
dataList = []
filePath = '../data/australian_user_reviews.json'

#Abrir el archivo y procesar cada línea
with open(filePath, 'r', encoding='utf-8') as file:
    for line in file:
        try:
            # Usar ast.literal_eval para convertir la línea en un diccionario
            jsonData = ast.literal_eval(line)
            dataList.append(jsonData)
        except ValueError as e:
            print(f"Error en la línea: {line}")
            continue

#Crear un DataFrame a partir de la lista de diccionarios
userReviewsDf = pd.DataFrame(dataList)

In [4]:
userReviewsDf.head()

Unnamed: 0,user_id,user_url,reviews
0,76561197970982479,http://steamcommunity.com/profiles/76561197970...,"[{'funny': '', 'posted': 'Posted November 5, 2..."
1,js41637,http://steamcommunity.com/id/js41637,"[{'funny': '', 'posted': 'Posted June 24, 2014..."
2,evcentric,http://steamcommunity.com/id/evcentric,"[{'funny': '', 'posted': 'Posted February 3.',..."
3,doctr,http://steamcommunity.com/id/doctr,"[{'funny': '', 'posted': 'Posted October 14, 2..."
4,maplemage,http://steamcommunity.com/id/maplemage,"[{'funny': '3 people found this review funny',..."


Comenzamos eliminando aquellas filas que se encuentren completamente vacías y transformamos a parquet.

In [30]:
userReviewsDf = userReviewsDf.dropna(how='all')
userReviewsDf.to_parquet('../data/userReviews.parquet', index = False)

<H3>User Reviews Exploded

Ya que las reseñas se encuentran anidadas por usuario en una lista de diccionarios, se crea un dataframe utilizando la función 'explode'.

In [31]:
userReviewsExplodedDf = userReviewsDf.explode('reviews').reset_index(drop=True)

In [32]:
userReviewsExplodedDf.head()

Unnamed: 0,user_id,user_url,reviews
0,76561197970982479,http://steamcommunity.com/profiles/76561197970...,"{'funny': '', 'posted': 'Posted November 5, 20..."
1,76561197970982479,http://steamcommunity.com/profiles/76561197970...,"{'funny': '', 'posted': 'Posted July 15, 2011...."
2,76561197970982479,http://steamcommunity.com/profiles/76561197970...,"{'funny': '', 'posted': 'Posted April 21, 2011..."
3,js41637,http://steamcommunity.com/id/js41637,"{'funny': '', 'posted': 'Posted June 24, 2014...."
4,js41637,http://steamcommunity.com/id/js41637,"{'funny': '', 'posted': 'Posted September 8, 2..."


Observamos los datos contenidos en la columna 'reviews' que ahora consta de diccionarios. Se realizan las siguientes transformaciones en el dataframe preparándolo para las funciones de la API y para el análisis de sentimiento.

In [33]:
userReviewsExplodedDf['reviews'].loc[0]

{'funny': '',
 'posted': 'Posted November 5, 2011.',
 'last_edited': '',
 'item_id': '1250',
 'helpful': 'No ratings yet',
 'recommend': True,
 'review': 'Simple yet with great replayability. In my opinion does "zombie" hordes and team work better than left 4 dead plus has a global leveling system. Alot of down to earth "zombie" splattering fun for the whole family. Amazed this sort of FPS is so rare.'}

In [34]:
# Se crea una nueva columna y se extrae el texto de la reseña
userReviewsExplodedDf['review'] = userReviewsExplodedDf['reviews'].apply(lambda x: x.get('review') if isinstance(x, dict) and 'review' in x else None)
# Se eliminan las filas sin reseña
userReviewsExplodedDf = userReviewsExplodedDf.dropna(subset='review')
# Se crea una nueva columna y se extrae la recomendación, este es un valor booleano
userReviewsExplodedDf['recommend'] = userReviewsExplodedDf['reviews'].apply(lambda x: x.get('recommend') if isinstance(x, dict) and 'recommend' in x else None)
# Se crea una nueva columna y se extrae el 'item_id' que nos servirá para vincular este dataframe con el resto
userReviewsExplodedDf['item_id'] = userReviewsExplodedDf['reviews'].apply(lambda x: x.get('item_id') if isinstance(x, dict) and 'item_id' in x else None)
# Se crea una nueva columna y se extrae el la fecha de la reseña que nos servirá para vincular este dataframe con el resto
userReviewsExplodedDf['review_date'] = userReviewsExplodedDf['reviews'].apply(lambda x: x.get('posted') if isinstance(x, dict) and 'posted' in x else None)
# Se elimina la parte de la fecha que contiene el string 'Posted ' para que pueda transformarse a formato de fecha
userReviewsExplodedDf['review_date'] = userReviewsExplodedDf['review_date'].str.replace('Posted ', '', regex=False)
userReviewsExplodedDf['review_date'] = pd.to_datetime(userReviewsExplodedDf['review_date'],format='mixed',errors='coerce')
# De la columna de 'review_date' se extrae el dato del año y se transforma a numérico
userReviewsExplodedDf['review_year'] = userReviewsExplodedDf['review_date'].dt.year
userReviewsExplodedDf['review_year'] = pd.to_numeric(userReviewsExplodedDf['review_year'], errors='coerce').astype('Int64')

Por último pasamos el dataframe a parquet.

In [35]:
userReviewsExplodedDf.to_parquet('../data/userReviewsExplodedPreReview.parquet',index=False)