### IMPORTACIONES

In [2]:
import pandas as pd
import json
import ast
import warnings
import pyarrow as pa
import pyarrow.parquet as pq
warnings.filterwarnings('ignore')

#### ETL SOBRE USERS_ITEMS

In [29]:
# Se carga el json de users_items y se realiza una lectura linea a linea que se irá almacenando en una lista para luego convertirla en un dataframe pandas
users_items = 'datos_json/australian_users_items.json'

listado = []

with open(users_items, 'r', encoding= 'utf-8') as archivo:
    for linea in archivo.readlines():
        listado.append(ast.literal_eval(linea))

dfUserItems = pd.DataFrame(listado)

# Se guarda en formato csv para mejorar tiempos de carga y flujo de trabajo.
dfUserItems.to_csv('json_a_csv/australian_users_items.csv', index=False)

In [30]:
# Se carga el csv previamente creado.
userItems = pd.read_csv("json_a_csv/australian_users_items.csv")

userItems

Unnamed: 0,user_id,items_count,steam_id,user_url,items
0,76561197970982479,277,76561197970982479,http://steamcommunity.com/profiles/76561197970...,"[{'item_id': '10', 'item_name': 'Counter-Strik..."
1,js41637,888,76561198035864385,http://steamcommunity.com/id/js41637,"[{'item_id': '10', 'item_name': 'Counter-Strik..."
2,evcentric,137,76561198007712555,http://steamcommunity.com/id/evcentric,"[{'item_id': '1200', 'item_name': 'Red Orchest..."
3,Riot-Punch,328,76561197963445855,http://steamcommunity.com/id/Riot-Punch,"[{'item_id': '10', 'item_name': 'Counter-Strik..."
4,doctr,541,76561198002099482,http://steamcommunity.com/id/doctr,"[{'item_id': '300', 'item_name': 'Day of Defea..."
...,...,...,...,...,...
88305,76561198323066619,22,76561198323066619,http://steamcommunity.com/profiles/76561198323...,"[{'item_id': '413850', 'item_name': 'CS:GO Pla..."
88306,76561198326700687,177,76561198326700687,http://steamcommunity.com/profiles/76561198326...,"[{'item_id': '11020', 'item_name': 'TrackMania..."
88307,XxLaughingJackClown77xX,0,76561198328759259,http://steamcommunity.com/id/XxLaughingJackClo...,[]
88308,76561198329548331,7,76561198329548331,http://steamcommunity.com/profiles/76561198329...,"[{'item_id': '304930', 'item_name': 'Unturned'..."


In [31]:
# Se realiza una primera verificación de nulos.
userItems.isnull().any()

user_id        False
items_count    False
steam_id       False
user_url       False
items          False
dtype: bool

In [32]:
# Se importa la funcion literal_eval para evaluar expresiones literales de tipo str
from ast import literal_eval

# Se aplica el literal_eval a toda la columna items convirtiendo las cadenas en estructuras de datos
userItems['items'] = userItems['items'].apply(literal_eval)

# Se aplica el metodo explode a la columna items para dividir las listas en filas separadas y se replican las demas columnas para cada fila resultante.
filasItems = userItems.explode('items')

# Se desglosan las columnas anidadas utilizando el metodo json_normalize.
columnasItems = pd.json_normalize(filasItems['items'])

# Se reinician los indices para posteriormente realizar un join
filasItems = filasItems.reset_index(drop=True)

# Se combina filasItems y columnasItems
userItems2 = filasItems.join(columnasItems)

# Se eliminan las columnas items y user_url
userItemsFinal = userItems2.drop(['items', 'user_url'], axis=1)

In [33]:
userItemsFinal

Unnamed: 0,user_id,items_count,steam_id,item_id,item_name,playtime_forever,playtime_2weeks
0,76561197970982479,277,76561197970982479,10,Counter-Strike,6.0,0.0
1,76561197970982479,277,76561197970982479,20,Team Fortress Classic,0.0,0.0
2,76561197970982479,277,76561197970982479,30,Day of Defeat,7.0,0.0
3,76561197970982479,277,76561197970982479,40,Deathmatch Classic,0.0,0.0
4,76561197970982479,277,76561197970982479,50,Half-Life: Opposing Force,0.0,0.0
...,...,...,...,...,...,...,...
5170010,76561198329548331,7,76561198329548331,373330,All Is Dust,0.0,0.0
5170011,76561198329548331,7,76561198329548331,388490,One Way To Die: Steam Edition,3.0,3.0
5170012,76561198329548331,7,76561198329548331,521570,You Have 10 Seconds 2,4.0,4.0
5170013,76561198329548331,7,76561198329548331,519140,Minds Eyes,3.0,3.0


In [34]:
# Se consulta la información disponible del dataframe
userItemsFinal.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5170015 entries, 0 to 5170014
Data columns (total 7 columns):
 #   Column            Dtype  
---  ------            -----  
 0   user_id           object 
 1   items_count       int64  
 2   steam_id          int64  
 3   item_id           object 
 4   item_name         object 
 5   playtime_forever  float64
 6   playtime_2weeks   float64
dtypes: float64(2), int64(2), object(3)
memory usage: 276.1+ MB


In [40]:
# Se observa la cantidad de nulos de cada columna.
userItemsFinal.isna().sum()

user_id                 0
items_count             0
steam_id                0
item_id             16806
item_name           16806
playtime_forever    16806
playtime_2weeks     16806
dtype: int64

In [36]:
# Se almacenan los indices de los nulos en la columna item_id
itemsNulos = userItemsFinal[userItemsFinal['item_id'].isna()].index

# Se utiliza la variable itemsNulos donde almacenaron los indices de los registros nulos para ubicar las filas correspondientes
userItemsFinal.iloc[itemsNulos]

Unnamed: 0,user_id,items_count,steam_id,item_id,item_name,playtime_forever,playtime_2weeks
3733,Wackky,0,76561198039117046,,,,
3849,76561198079601835,0,76561198079601835,,,,
6019,hellom8o,0,76561198117222320,,,,
6523,starkillershadow553,0,76561198059648579,,,,
7237,darkenkane,0,76561198058876001,,,,
...,...,...,...,...,...,...,...
5169470,76561198316380182,0,76561198316380182,,,,
5169471,76561198316970597,0,76561198316970597,,,,
5169472,76561198318100691,0,76561198318100691,,,,
5170006,XxLaughingJackClown77xX,0,76561198328759259,,,,


In [37]:
# Se analiza el dataset resultante a partir de los nulos y se utiliza el metodo dropna con un filtro para que elimine las filas que posean menos de 4 datos NO nulos
useritemsdropeados = userItemsFinal.dropna(thresh=4)

In [43]:
# Se observa nuevamente la cantidad de nulos de cada columna.
useritemsdropeados.isna().sum()

user_id             0
items_count         0
steam_id            0
item_id             0
item_name           0
playtime_forever    0
playtime_2weeks     0
dtype: int64

In [44]:
# Se modifica el nombre de la columna item_id por id para facilitar la vinculación entre datasets posteriormente.
useritemsdropeados.rename(columns={'item_id': 'id'}, inplace=True)

In [46]:
useritemsdropeados


Unnamed: 0,user_id,items_count,steam_id,id,item_name,playtime_forever,playtime_2weeks
0,76561197970982479,277,76561197970982479,10,Counter-Strike,6.0,0.0
1,76561197970982479,277,76561197970982479,20,Team Fortress Classic,0.0,0.0
2,76561197970982479,277,76561197970982479,30,Day of Defeat,7.0,0.0
3,76561197970982479,277,76561197970982479,40,Deathmatch Classic,0.0,0.0
4,76561197970982479,277,76561197970982479,50,Half-Life: Opposing Force,0.0,0.0
...,...,...,...,...,...,...,...
5170009,76561198329548331,7,76561198329548331,346330,BrainBread 2,0.0,0.0
5170010,76561198329548331,7,76561198329548331,373330,All Is Dust,0.0,0.0
5170011,76561198329548331,7,76561198329548331,388490,One Way To Die: Steam Edition,3.0,3.0
5170012,76561198329548331,7,76561198329548331,521570,You Have 10 Seconds 2,4.0,4.0


In [47]:
useritemsdropeados.to_csv('csv_limpios/user_items.csv', index= False)

In [48]:
# Se convierte el DataFrame a una tabla de Arrow para posteriormente comprimirlo en parquet
table = pa.Table.from_pandas(useritemsdropeados)

# Se especifica el nombre del archivo Parquet y comprime con snappy
parquet_file = 'datos_parquet/user_items.parquet'
pq.write_table(table, parquet_file, compression='snappy')

In [3]:
# Se carga el dataset previamente comprimido en parquet
users_items_parquet = pd.read_parquet('datos_parquet/user_items.parquet')

In [4]:
# Se verifica el archivo parquet, ya que se hallaron problemas con el tipo de la columna id al comprimirlo en parquet, se va a castear a int.
users_items_parquet['id'] = users_items_parquet['id'].astype(int)

In [6]:
users_items_parquet.info()

<class 'pandas.core.frame.DataFrame'>
Index: 5153209 entries, 0 to 5170013
Data columns (total 7 columns):
 #   Column            Dtype  
---  ------            -----  
 0   user_id           object 
 1   items_count       int64  
 2   steam_id          int64  
 3   id                int32  
 4   item_name         object 
 5   playtime_forever  float64
 6   playtime_2weeks   float64
dtypes: float64(2), int32(1), int64(2), object(2)
memory usage: 294.9+ MB


In [None]:
# Se convierte el DataFrame a una tabla de Arrow para posteriormente comprimirlo en parquet
table = pa.Table.from_pandas(users_items_parquet)

# Se especifica el nombre del archivo Parquet y comprime con snappy
parquet_file = 'datos_parquet/user_items.parquet'
pq.write_table(table, parquet_file, compression='snappy')