# ETL de la Data Business Starbucks Yelp

In [1]:
import pandas as pd
from datetime import datetime
import os

def procesar_y_guardar_starbucks(pkl_path, save_directory):
    # Cargar el archivo .pkl en un DataFrame de Pandas
    df_business_starbucks = pd.read_pickle(pkl_path)

    # Eliminar las columnas duplicadas
    df_business_starbucks = df_business_starbucks.loc[:, ~df_business_starbucks.columns.duplicated()]

    # Filtrar el DataFrame para incluir solo las filas donde 'name' contiene la palabra 'Starbucks'
    df_business_starbucks = df_business_starbucks[df_business_starbucks['name'].str.contains('Starbucks', na=False)]

    # Crear la nueva columna 'starbucks_id' concatenando las columnas especificadas, separadas por un guion
    df_business_starbucks['starbucks_id'] = df_business_starbucks['state'].astype(str) + '-' + \
                                   df_business_starbucks['latitude'].astype(str) + '-' + \
                                   df_business_starbucks['longitude'].astype(str)

    # Convertir la columna 'postal_code' a tipo numérico (float)
    df_business_starbucks['postal_code'] = pd.to_numeric(df_business_starbucks['postal_code'], errors='coerce')

    # Convertir la columna 'city' a tipo str
    df_business_starbucks['city'] = df_business_starbucks['city'].astype(str)

    # Eliminar las filas donde la columna 'name' tiene valores NaN
    df_business_starbucks = df_business_starbucks.dropna(subset=['name'])

    # Eliminar las columnas 'attributes' y 'hours'
    df_business_starbucks.drop(columns=['attributes', 'hours'], inplace=True)

    # Restablecer el índice
    df_business_starbucks = df_business_starbucks.reset_index(drop=True)

    # Obtener la fecha actual en formato YYYY-MM-DD
    fecha_actual = datetime.now().strftime('%Y-%m-%d')

    # Crear el nombre del archivo
    nombre_archivo = f'business_starbucks_{fecha_actual}.parquet'

    # Guardar el DataFrame como archivo Parquet en el directorio especificado
    df_business_starbucks.to_parquet(f'{save_directory}/{nombre_archivo}')

    def guardar_y_comparar(df):
        # Obtener la fecha actual en formato YYYY-MM-DD
        fecha_actual = datetime.now().strftime('%Y-%m-%d')
        # Crear el nombre del archivo
        nombre_archivo = f'business_starbucks_{fecha_actual}.parquet'
        # Guardar el DataFrame como archivo Parquet en el directorio especificado
        df.to_parquet(f'{save_directory}/{nombre_archivo}')

        # Comprobar si el archivo existe
        archivo_antiguo = f'{save_directory}/business_starbucks.parquet'
        if os.path.exists(archivo_antiguo):
            # Cargar el archivo existente
            df_antiguo = pd.read_parquet(archivo_antiguo)
            
            # Concatenar los datos que no están presentes en el archivo existente
            df_nuevo = pd.concat([df_antiguo, df]).drop_duplicates().reset_index(drop=True)
            
            # Guardar el DataFrame actualizado con el mismo nombre
            df_nuevo.to_parquet(archivo_antiguo)
            
            # Verificar si se actualizó la data
            if len(df) > 0:
                print("Se ha actualizado la data.")
            else:
                print("No hay nuevos datos para actualizar.")
        else:
            print("No se encontró el archivo antiguo. Guardando el nuevo archivo.")

    # Usar la función con el DataFrame procesado
    guardar_y_comparar(df_business_starbucks)

In [2]:
# Ejecucion de la Funcion
procesar_y_guardar_starbucks('../data/business.pkl', '../gcp')

Se ha actualizado la data.


# ETL de la Data Business Dunkin Yelp

In [None]:
import pandas as pd
from datetime import datetime
import os

def procesar_y_guardar_dunkin(pkl_path, save_directory):
    # Cargar el archivo .pkl en un DataFrame de Pandas
    df_business_dunkin = pd.read_pickle(pkl_path)

    # Eliminar las columnas duplicadas
    df_business_dunkin = df_business_dunkin.loc[:, ~df_business_dunkin.columns.duplicated()]

    # Filtrar el DataFrame para incluir solo las filas donde 'name' contiene la palabra 'Dunkin'
    df_business_dunkin = df_business_dunkin[df_business_dunkin['name'].str.contains('Dunkin', na=False)]

    # Crear la nueva columna 'dunkin_id' concatenando las columnas especificadas, separadas por un guion
    df_business_dunkin['dunkin_id'] = df_business_dunkin['state'].astype(str) + '-' + \
                                   df_business_dunkin['latitude'].astype(str) + '-' + \
                                   df_business_dunkin['longitude'].astype(str)

    # Convertir la columna 'postal_code' a tipo numérico (float)
    df_business_dunkin['postal_code'] = pd.to_numeric(df_business_dunkin['postal_code'], errors='coerce')

    # Convertir la columna 'city' a tipo str
    df_business_dunkin['city'] = df_business_dunkin['city'].astype(str)

    # Eliminar las filas donde la columna 'name' tiene valores NaN
    df_business_dunkin = df_business_dunkin.dropna(subset=['name'])

    # Eliminar las columnas 'attributes' y 'hours'
    df_business_dunkin.drop(columns=['attributes', 'hours'], inplace=True)

    # Restablecer el índice
    df_business_dunkin = df_business_dunkin.reset_index(drop=True)

    # Obtener la fecha actual en formato YYYY-MM-DD
    fecha_actual = datetime.now().strftime('%Y-%m-%d')

    # Crear el nombre del archivo
    nombre_archivo = f'business_dunkin_{fecha_actual}.parquet'

    # Guardar el DataFrame como archivo Parquet en el directorio especificado
    df_business_dunkin.to_parquet(f'{save_directory}/{nombre_archivo}')

    def guardar_y_comparar(df):
        # Obtener la fecha actual en formato YYYY-MM-DD
        fecha_actual = datetime.now().strftime('%Y-%m-%d')
        # Crear el nombre del archivo
        nombre_archivo = f'business_dunkin_{fecha_actual}.parquet'
        # Guardar el DataFrame como archivo Parquet en el directorio especificado
        df.to_parquet(f'{save_directory}/{nombre_archivo}')

        # Comprobar si el archivo existe
        archivo_antiguo = f'{save_directory}/business_dunkin.parquet'
        if os.path.exists(archivo_antiguo):
            # Cargar el archivo existente
            df_antiguo = pd.read_parquet(archivo_antiguo)
            
            # Concatenar los datos que no están presentes en el archivo existente
            df_nuevo = pd.concat([df_antiguo, df]).drop_duplicates().reset_index(drop=True)
            
            # Guardar el DataFrame actualizado con el mismo nombre
            df_nuevo.to_parquet(archivo_antiguo)
            
            # Verificar si se actualizó la data
            if len(df) > 0:
                print("Se ha actualizado la data.")
            else:
                print("No hay nuevos datos para actualizar.")
        else:
            print("No se encontró el archivo antiguo. Guardando el nuevo archivo.")

    # Usar la función con el DataFrame procesado
    guardar_y_comparar(df_business_dunkin)

In [None]:
# Ejecucion de la Funcion
procesar_y_guardar_dunkin('../data/business.pkl', '../gcp')

# ETL de la Data Review Starbucks Yelp

In [26]:
import pandas as pd
import json
import os
from datetime import datetime

def process_reviews_starbucks():
    # Directorios y archivos
    review_file_path = '../data/reviews.json'
    business_file_path = '../gcp/business_starbucks.parquet'
    output_dir = '../gcp/'
    output_file_base = 'reviews_starbucks'
    output_file_ext = '.parquet'
    
    # Leer el archivo reviews.json entero
    with open(review_file_path, 'r') as f:
        reviews = json.load(f)
    df_reviews = pd.DataFrame(reviews)
    
    # Leer el archivo business_starbucks.parquet
    df_business = pd.read_parquet(business_file_path)
    
    # Verificar que las columnas necesarias existan
    if 'business_id' not in df_reviews.columns:
        raise KeyError("La columna 'business_id' no se encuentra en el archivo reviews.json")
    if 'business_id' not in df_business.columns:
        raise KeyError("La columna 'business_id' no se encuentra en el archivo business_starbucks.parquet")
    
    # Cruzar ambos archivos por la columna business_id
    df_reviews_starbucks = df_reviews[df_reviews['business_id'].isin(df_business['business_id'])]
    
    # Verificar que la columna 'date' exista
    if 'date' not in df_reviews_starbucks.columns:
        raise KeyError("La columna 'date' no se encuentra en el DataFrame resultante después del cruce")
    
    # Eliminar la hora de la columna 'date', manteniendo el tipo datetime
    df_reviews_starbucks['date'] = pd.to_datetime(df_reviews_starbucks['date']).dt.normalize()
    
    # Reiniciar el índice del DataFrame
    df_reviews_starbucks.reset_index(drop=True, inplace=True)
    
    # Convertir a parquet sin guardar aún
    current_date = datetime.now().strftime('%Y%m%d')
    temp_output_file = os.path.join(output_dir, f"{output_file_base}_{current_date}{output_file_ext}")
    df_reviews_starbucks.to_parquet(temp_output_file)
    
    final_output_file = os.path.join(output_dir, f"{output_file_base}{output_file_ext}")
    
    # Si el archivo reviews_starbucks.parquet ya existe
    if os.path.exists(final_output_file):
        existing_reviews = pd.read_parquet(final_output_file)
        
        # Combinar DataFrames y eliminar duplicados
        combined_reviews = pd.concat([existing_reviews, df_reviews_starbucks]).drop_duplicates(subset=['review_id'])
        
        # Reiniciar índice
        combined_reviews.reset_index(drop=True, inplace=True)
        
        # Guardar el DataFrame combinado
        combined_reviews.to_parquet(final_output_file)
        print("La data se actualizó.")
    else:
        # Renombrar el archivo temporal
        os.rename(temp_output_file, final_output_file)
        print(f"El archivo se guardó como {final_output_file}")

In [27]:
# Ejecutar la función
process_reviews_starbucks()

El archivo se guardó como ../gcp/reviews_starbucks.parquet


# ETL de la Data Review Dunkin Yelp

In [24]:
import pandas as pd
import json
import os
from datetime import datetime

def process_reviews_dunkin():
    # Directorios y archivos
    review_file_path = '../data/reviews.json'
    business_file_path = '../gcp/business_dunkin.parquet'
    output_dir = '../gcp/'
    output_file_base = 'reviews_dunkin'
    output_file_ext = '.parquet'
    
    # Leer el archivo reviews.json entero
    with open(review_file_path, 'r') as f:
        reviews = json.load(f)
    df_reviews = pd.DataFrame(reviews)
    
    # Leer el archivo business_dunkin.parquet
    df_business = pd.read_parquet(business_file_path)
    
    # Verificar que las columnas necesarias existan
    if 'business_id' not in df_reviews.columns:
        raise KeyError("La columna 'business_id' no se encuentra en el archivo reviews.json")
    if 'business_id' not in df_business.columns:
        raise KeyError("La columna 'business_id' no se encuentra en el archivo business_dunkin.parquet")
    
    # Cruzar ambos archivos por la columna business_id
    df_reviews_dunkin = df_reviews[df_reviews['business_id'].isin(df_business['business_id'])]
    
    # Verificar que la columna 'date' exista
    if 'date' not in df_reviews_dunkin.columns:
        raise KeyError("La columna 'date' no se encuentra en el DataFrame resultante después del cruce")
    
    # Eliminar la hora de la columna 'date', manteniendo el tipo datetime
    df_reviews_dunkin['date'] = pd.to_datetime(df_reviews_dunkin['date']).dt.normalize()
    
    # Reiniciar el índice del DataFrame
    df_reviews_dunkin.reset_index(drop=True, inplace=True)
    
    # Convertir a parquet sin guardar aún
    current_date = datetime.now().strftime('%Y%m%d')
    temp_output_file = os.path.join(output_dir, f"{output_file_base}_{current_date}{output_file_ext}")
    df_reviews_dunkin.to_parquet(temp_output_file)
    
    final_output_file = os.path.join(output_dir, f"{output_file_base}{output_file_ext}")
    
    # Si el archivo reviews_dunkin.parquet ya existe
    if os.path.exists(final_output_file):
        existing_reviews = pd.read_parquet(final_output_file)
        
        # Combinar DataFrames y eliminar duplicados
        combined_reviews = pd.concat([existing_reviews, df_reviews_dunkin]).drop_duplicates(subset=['review_id'])
        
        # Reiniciar índice
        combined_reviews.reset_index(drop=True, inplace=True)
        
        # Guardar el DataFrame combinado
        combined_reviews.to_parquet(final_output_file)
        print("La data se actualizó.")
    else:
        # Renombrar el archivo temporal
        os.rename(temp_output_file, final_output_file)
        print(f"El archivo se guardó como {final_output_file}")

In [25]:
# Ejecutar la función
process_reviews_dunkin()

El archivo se guardó como ../gcp/reviews_dunkin.parquet


# ETL de la Data User Yelp

In [7]:
import pandas as pd
import pyarrow.parquet as pq
import os
from datetime import datetime

def process_user_data():
    # Definir tamaños de los lotes
    batch_size = 10000  # Ajustar según la capacidad de memoria disponible

    # Cargar los archivos de reviews
    reviews_dunkin_df = pd.read_parquet('../gcp/reviews_dunkin.parquet')
    reviews_starbucks_df = pd.read_parquet('../gcp/reviews_starbucks.parquet')

    # Obtener los user_id únicos de los archivos de reviews
    dunkin_user_ids = reviews_dunkin_df['user_id'].unique()
    starbucks_user_ids = reviews_starbucks_df['user_id'].unique()

    # Crear un set de user_ids únicos de ambos archivos de reviews
    combined_user_ids = set(dunkin_user_ids).union(set(starbucks_user_ids))

    # Inicializar el archivo Parquet
    user_parquet_file = pq.ParquetFile('../data/user.parquet')

    # Inicializar un writer para el archivo temporal
    with pd.HDFStore('temp_user.h5', mode='w') as store:
        max_lengths = {}

        # Procesar el archivo user.parquet en lotes
        for batch in user_parquet_file.iter_batches(batch_size):
            # Convertir el batch en DataFrame
            chunk = batch.to_pandas()

            # Filtrar el chunk actual
            filtered_chunk = chunk[chunk['user_id'].isin(combined_user_ids)]

            # Obtener la longitud máxima de cada columna string en el chunk
            for col in filtered_chunk.select_dtypes(include='object').columns:
                max_len = filtered_chunk[col].str.len().max()
                if col in max_lengths:
                    max_lengths[col] = max(max_lengths[col], max_len)
                else:
                    max_lengths[col] = max_len

        # Asegurarse de que cada longitud máxima sea al menos 20 (o un valor adecuado)
        min_itemsize = {col: max(20, int(max_len) + 1) if pd.notna(max_len) else 20 for col, max_len in max_lengths.items()}

        # Reinicializar la iteración sobre los lotes
        for batch in user_parquet_file.iter_batches(batch_size):
            # Convertir el batch en DataFrame
            chunk = batch.to_pandas()

            # Filtrar el chunk actual
            filtered_chunk = chunk[chunk['user_id'].isin(combined_user_ids)]

            # Escribir el chunk filtrado en el archivo temporal
            store.append('filtered_user', filtered_chunk, data_columns=True, index=False, min_itemsize=min_itemsize)

    # Cargar el archivo temporal y guardarlo como user.parquet en el directorio ..//gcp//
    with pd.HDFStore('temp_user.h5', mode='r') as store:
        filtered_user_df = store['filtered_user']

    # Eliminar las columnas yelping_since, elite, friends, fans
    filtered_user_df.drop(columns=['yelping_since', 'elite', 'friends', 'fans'], inplace=True)

    # Resetear el índice
    filtered_user_df.reset_index(drop=True, inplace=True)

    # Obtener la fecha actual
    today_str = datetime.today().strftime('%Y-%m-%d')

    # Nombre del archivo de salida
    output_file_path = f'../gcp/user_{today_str}.parquet'

    # Guardar el resultado en el directorio ..//gcp// como user_fecha_actual.parquet
    filtered_user_df.to_parquet(output_file_path)

    # Verificar si existe el archivo user.parquet en el directorio ..//gcp//
    existing_user_file_path = '../gcp/user.parquet'
    if not os.path.exists(existing_user_file_path):
        # Si no existe, renombrar el archivo actual
        os.rename(output_file_path, existing_user_file_path)
    else:
        # Si existe, cruzar el archivo nuevo con user.parquet
        existing_user_df = pd.read_parquet(existing_user_file_path)

        # Filtrar los nuevos usuarios que no están en el archivo existente
        new_users_df = filtered_user_df[~filtered_user_df['user_id'].isin(existing_user_df['user_id'])]

        # Combinar los datos existentes con los nuevos usuarios
        combined_user_df = pd.concat([existing_user_df, new_users_df], ignore_index=True)

        # Guardar el archivo combinado
        combined_user_df.to_parquet(existing_user_file_path)


In [8]:
# Llamar a la función para procesar los datos
process_user_data()