El objetivo de este etl tiene como objetivo poder disponer de toda la informacion de Yelp recibida en formato parquet para ser procesada con pandas. 

In [1]:
import pandas as pd
import pickle
import json
import os
import pyarrow as pa
import fastparquet as fp
import numpy as np

En el repositorio local, este archivo etl_yelp.ipynb, toma los datos del dataset consigna de una carpeta llamada Yelp, en la que se encuentran los archivos:
business.pkl
checkin.json
review.json
tip.json
user.parquet

business.pkl to parquet

In [2]:
# Ruta a la carpeta Yelp
yelp_folder = 'Yelp'

file_to_df = 'business.pkl'
# Leer el archivo business.pkl
business_df = pd.read_pickle(os.path.join(yelp_folder, file_to_df))

In [3]:
business_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 150346 entries, 0 to 150345
Data columns (total 28 columns):
 #   Column        Non-Null Count   Dtype 
---  ------        --------------   ----- 
 0   business_id   150346 non-null  object
 1   name          150346 non-null  object
 2   address       150346 non-null  object
 3   city          150346 non-null  object
 4   state         150343 non-null  object
 5   postal_code   150346 non-null  object
 6   latitude      150346 non-null  object
 7   longitude     150346 non-null  object
 8   stars         150346 non-null  object
 9   review_count  150346 non-null  object
 10  is_open       150346 non-null  object
 11  attributes    136602 non-null  object
 12  categories    150243 non-null  object
 13  hours         127123 non-null  object
 14  business_id   5 non-null       object
 15  name          5 non-null       object
 16  address       5 non-null       object
 17  city          5 non-null       object
 18  state         5 non-null     

In [4]:
# Crear un nuevo DataFrame con las primeras 14 columnas
df_business = business_df.iloc[:, :14]

In [5]:
df_business.info()

<class 'pandas.core.frame.DataFrame'>
Index: 150346 entries, 0 to 150345
Data columns (total 14 columns):
 #   Column        Non-Null Count   Dtype 
---  ------        --------------   ----- 
 0   business_id   150346 non-null  object
 1   name          150346 non-null  object
 2   address       150346 non-null  object
 3   city          150346 non-null  object
 4   state         150343 non-null  object
 5   postal_code   150346 non-null  object
 6   latitude      150346 non-null  object
 7   longitude     150346 non-null  object
 8   stars         150346 non-null  object
 9   review_count  150346 non-null  object
 10  is_open       150346 non-null  object
 11  attributes    136602 non-null  object
 12  categories    150243 non-null  object
 13  hours         127123 non-null  object
dtypes: object(14)
memory usage: 17.2+ MB


In [6]:
# Verificar si el directorio 'parquet' existe, y si no, crearlo
if not os.path.exists('parquet'):
    os.makedirs('parquet')

# Obtener el nombre del archivo original sin la extensión
df_2_file_name = 'business.pkl'.split('.')[0]

# Guardar el DataFrame en formato Parquet en el directorio 'parquet' con el nombre del archivo original
file_path = os.path.join('parquet', f'{df_2_file_name}.parquet')

# Guardar el DataFrame en formato Parquet usando fastparquet
df_business.to_parquet(file_path, engine='fastparquet')

print(f"DataFrame guardado como {file_path}")

DataFrame guardado como parquet\business.parquet


checkin.json to parquet

In [7]:
# Ruta al archivo checkin.json dentro de la carpeta Yelp
checkin_path = os.path.join('Yelp', 'checkin.json')

# Lista para almacenar los DataFrames
checkin_dfs = []

# Leer el archivo línea por línea y convertir cada línea en un DataFrame
with open(checkin_path, 'r') as checkin_file:
    for line in checkin_file:
        data = eval(line)  # Convertir la línea en un diccionario
        df = pd.DataFrame([data])  # Crear un DataFrame a partir del diccionario
        checkin_dfs.append(df)

# Concatenar los DataFrames en uno solo
checkin_df = pd.concat(checkin_dfs, ignore_index=True)

In [8]:
checkin_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 131930 entries, 0 to 131929
Data columns (total 2 columns):
 #   Column       Non-Null Count   Dtype 
---  ------       --------------   ----- 
 0   business_id  131930 non-null  object
 1   date         131930 non-null  object
dtypes: object(2)
memory usage: 2.0+ MB


In [9]:
# Mostrar los primeros 10 registros de cada campo 
for column in checkin_df.columns:
    print(f"Columna: {column}")
    print(checkin_df[column].head(10))
    print("\n")

Columna: business_id
0    ---kPU91CF4Lq2-WlRu9Lw
1    --0iUa4sNDFiZFrAdIWhZQ
2    --30_8IhuyMHbSOcNWd6DQ
3    --7PUidqRWpRSpXebiyxTg
4    --7jw19RH9JKXgFohspgQw
5    --8IbOsAAxjKRoYsBFL-PA
6    --9osgUCSDUWUkoTLdvYhQ
7    --ARBQr1WMsTWiwOKOj-FQ
8    --FWWsIwxRwuw9vIMImcQg
9    --FcbSxK1AoEtEAxOgBaCw
Name: business_id, dtype: object


Columna: date
0    2020-03-13 21:10:56, 2020-06-02 22:18:06, 2020...
1    2010-09-13 21:43:09, 2011-05-04 23:08:15, 2011...
2             2013-06-14 23:29:17, 2014-08-13 23:20:22
3    2011-02-15 17:12:00, 2011-07-28 02:46:10, 2012...
4    2014-04-21 20:42:11, 2014-04-28 21:04:46, 2014...
5    2015-06-06 01:03:19, 2015-07-29 16:50:58, 2015...
6    2015-06-13 02:00:57, 2015-07-04 00:44:09, 2015...
7    2014-12-12 00:44:23, 2015-01-09 00:19:52, 2015...
8    2010-09-11 16:28:39, 2010-12-22 21:14:19, 2011...
9    2017-08-18 19:43:50, 2017-10-07 22:38:38, 2017...
Name: date, dtype: object




In [10]:
# Verificar si el directorio 'parquet' existe, y si no, crearlo
if not os.path.exists('parquet'):
    os.makedirs('parquet')

# Obtener el nombre del archivo original sin la extensión
df_2_file_name = 'checkin.json'.split('.')[0]

# Guardar el DataFrame en formato Parquet en el directorio 'parquet' con el nombre del archivo original
file_path = os.path.join('parquet', f'{df_2_file_name}.parquet')

# A continuación, se guarda el DataFrame en formato Parquet usando fastparquet
checkin_df.to_parquet(file_path, engine='fastparquet')

print(f"DataFrame guardado como {file_path}")

DataFrame guardado como parquet\checkin.parquet


tip.json to parquet

In [11]:
# Ruta al archivo tip.json dentro de la carpeta Yelp
tip_path = os.path.join('Yelp', 'tip.json')

# Lista para almacenar los DataFrames
tip_df = []

# Leer el archivo línea por línea y convertir cada línea en un DataFrame
with open(tip_path, 'r', encoding='utf-8') as tip_file:
    for line in tip_file:
        data = json.loads(line)  # Convertir la línea en un diccionario
        df = pd.DataFrame([data])  # Crear un DataFrame a partir del diccionario
        tip_df.append(df)

# Concatenar los DataFrames en uno solo
tip_df = pd.concat(tip_df, ignore_index=True)


In [12]:
tip_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 908915 entries, 0 to 908914
Data columns (total 5 columns):
 #   Column            Non-Null Count   Dtype 
---  ------            --------------   ----- 
 0   user_id           908915 non-null  object
 1   business_id       908915 non-null  object
 2   text              908915 non-null  object
 3   date              908915 non-null  object
 4   compliment_count  908915 non-null  int64 
dtypes: int64(1), object(4)
memory usage: 34.7+ MB


In [13]:
# Mostrar los primeros 10 registros de cada campo 
for column in tip_df.columns:
    print(f"Columna: {column}")
    print(tip_df[column].head(10))
    print("\n")

Columna: user_id
0    AGNUgVwnZUey3gcPCJ76iw
1    NBN4MgHP9D3cw--SnauTkA
2    -copOvldyKh1qr-vzkDEvw
3    FjMQVZjSqY8syIO-53KFKw
4    ld0AperBXk1h6UbqmM80zw
5    trf3Qcz8qvCDKXiTgjUcEg
6    SMGAlRjyfuYu-c-22zIyOg
7    YVBB9g23nuVJ0u44zK0pSA
8    VL12EhEdT4OWqGq0nIqkzw
9    4ay-fdVks5WMerYL_htkGQ
Name: user_id, dtype: object


Columna: business_id
0    3uLgwr0qeCNMjKenHJwPGQ
1    QoezRbYQncpRqyrLH6Iqjg
2    MYoRNLb5chwjQe3c_k37Gg
3    hV-bABTK-glh5wj31ps_Jw
4    _uN0OudeJ3Zl_tf6nxg5ww
5    7Rm9Ba50bw23KTA8RedZYg
6    kH-0iXqkL7b8UXNpguBMKg
7    jtri188kuhe_AuEOJ51U_A
8    xODBZmX4EmlVvbqtKN7YKg
9    pICJRcyqW1cF96Q3XhLSbw
Name: business_id, dtype: object


Columna: text
0                       Avengers time with the ladies.
1    They have lots of good deserts and tasty cuban...
2               It's open even when you think it isn't
3                            Very decent fried chicken
4               Appetizers.. platter special for lunch
5    Chili Cup + Single Cheeseburger with onion

In [14]:
# Transformar columna 'date' a formato datetime
tip_df['date'] = pd.to_datetime(tip_df['date'])

# Luego, ordena el DataFrame por la columna 'date' en orden descendente y selecciona las últimas 20 fechas.
last_20_dates = tip_df['date'].sort_values(ascending=False).head(20)

# Imprime las últimas 20 fechas
print(last_20_dates)


538797   2022-01-19 20:38:55
741844   2022-01-19 19:20:35
901828   2022-01-19 19:07:52
697249   2022-01-19 19:06:01
536750   2022-01-19 18:46:08
353922   2022-01-19 18:42:44
826026   2022-01-19 17:40:43
877776   2022-01-19 17:37:41
505905   2022-01-19 17:33:53
541978   2022-01-19 17:16:57
702824   2022-01-19 16:58:06
412609   2022-01-19 16:49:51
539362   2022-01-19 15:48:50
543747   2022-01-19 13:10:15
421768   2022-01-19 12:25:12
696565   2022-01-19 12:11:31
529662   2022-01-19 06:33:22
529685   2022-01-19 05:23:53
543800   2022-01-19 04:47:00
821171   2022-01-19 03:48:47
Name: date, dtype: datetime64[ns]


In [15]:
# Verificar si el directorio 'parquet' existe, y si no, crearlo
if not os.path.exists('parquet'):
    os.makedirs('parquet')

# Obtener el nombre del archivo original sin la extensión
df_2_file_name = 'tip.json'.split('.')[0]

# Guardar el DataFrame en formato Parquet en el directorio 'parquet' con el nombre del archivo original
file_path = os.path.join('parquet', f'{df_2_file_name}.parquet')

# A continuación, se guarda el DataFrame en formato Parquet usando fastparquet
tip_df.to_parquet(file_path, engine='fastparquet')

print(f"DataFrame guardado como {file_path}")

DataFrame guardado como parquet\tip.parquet


review.json to parquet

In [16]:
# Ruta del archivo original y copia
archivo_original = 'Yelp/review.json'
archivo_copia = 'Yelp/review_copia.json'

# Copiar el archivo original a review_copia.json
with open(archivo_original, 'r', encoding='utf-8') as original_file:
    data = [json.loads(line) for line in original_file]
    with open(archivo_copia, 'w', encoding='utf-8') as copia_file:
        for line in data:
            json.dump(line, copia_file)
            copia_file.write('\n')

# Número de líneas por archivo parquet
lineas_por_archivo = 1000000
numero_archivo = 1
linea_actual = 0
df_actual = []
lista_df_reviews = []  # Lista para almacenar DataFrames

# Crear la carpeta para guardar los archivos parquet
if not os.path.exists('pq_review'):
    os.makedirs('pq_review')

for linea in data:
    df_actual.append(linea)
    linea_actual += 1

    if linea_actual == lineas_por_archivo:
        # Crear un DataFrame
        df = pd.DataFrame(df_actual)
        
        # Agregar el DataFrame a la lista
        lista_df_reviews.append(df)

        # Nombre del archivo parquet
        nombre_archivo = f'pq_review/review_{str(numero_archivo).zfill(2)}.parquet'

        # Guardar el DataFrame como archivo parquet
        df.to_parquet(nombre_archivo, index=False)

        # Reiniciar el contador de líneas y el DataFrame actual
        linea_actual = 0
        df_actual = []
        numero_archivo += 1

# Si quedan líneas en el último DataFrame
if df_actual:
    df = pd.DataFrame(df_actual)
    
    # Agregar el último DataFrame a la lista
    lista_df_reviews.append(df)
    
    nombre_archivo = f'pq_review/review_{str(numero_archivo).zfill(2)}.parquet'
    df.to_parquet(nombre_archivo, index=False)


In [43]:
len(lista_df_reviews)

7