# 🛠️Automatización del ETL 

- Se realizó la automatización de la limpieza y transformación  de los archivos que subimos al bucket denominado ` pizza-henry ` en la dirección ` \RAW\Yelp ` y posteriormente la carga haciael bucket ` pizza-henry-staged ` en la dirección ` \STAGED\ `.
- Este proceso nos ahorra tiempo y reduce la posibilidad de errores humano, por esa misma razòn, el còdigo es  ` eficienciente `
- Este código es ` escalable `, ya que se puede adaptar para manejar gran volumen de datos y diferentes tipos de análisis con mínimas modificaciones 

## Yelp📍

### ✅Etl de yelp_business

In [None]:
import functions_framework
import pandas as pd
from google.cloud import storage
import os
from io import BytesIO
import re

# Inicializa el cliente de Google Cloud Storage
storage_client = storage.Client()

def process_and_upload_data(bucket_name, source_blob_name, destination_bucket_name):
    try:
        print(f"Procesando archivo desde {bucket_name}/{source_blob_name}")

        # Descargar el archivo
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(source_blob_name)
        pkl_data = blob.download_as_bytes()
        
        # Cargar los datos en un DataFrame
        temp_file = BytesIO(pkl_data)
        df_business = pd.read_pickle(temp_file)
        
        # Procesamiento específico
        df_business = df_business.loc[:, ~df_business.columns.duplicated()]
        df_business = df_business.dropna(subset=['categories'])
        df_restaurantes = df_business[df_business['categories'].str.lower().str.contains('restaurant')].reset_index(drop=True)
        
        # Filtros geográficos y categorías
        latitud_max, latitud_min = 31.0, 24.5
        longitud_max, longitud_min = -80.0, -87.6
        mascara_latitud = (df_restaurantes['latitude'] >= latitud_min) & (df_restaurantes['latitude'] <= latitud_max)
        mascara_longitud = (df_restaurantes['longitude'] >= longitud_min) & (df_restaurantes['longitude'] <= longitud_max)
        df_restaurantes_FL = df_restaurantes[mascara_latitud & mascara_longitud]
        df_restaurantes_FL = df_restaurantes_FL[df_restaurantes_FL['is_open'] == 1].reset_index(drop=True)
        df_restaurantes_FL.drop(columns=['is_open', 'state'], inplace=True)
        
        # Ajustes adicionales y carga
        df_restaurantes_FL = df_restaurantes_FL.drop(columns=['postal_code', 'attributes', 'hours'])
        
        # Filtrar pizzerías y Taco Bell en Florida
        pizza_pattern = re.compile(r'pizza|pizzería', re.IGNORECASE)
        taco_bell_pattern = re.compile(r'taco\s*bell', re.IGNORECASE)

        df_filtered = df_restaurantes_FL[
            df_restaurantes_FL['categories'].str.contains(pizza_pattern) | 
            df_restaurantes_FL['name'].apply(lambda x: bool(pizza_pattern.search(x))) |
            df_restaurantes_FL['name'].apply(lambda x: bool(taco_bell_pattern.search(x)))
        ]

        # Verificar que todas las filas tengan el mismo número de columnas
        expected_columns = 9
        df_filtered = df_filtered.loc[:, df_filtered.columns[:expected_columns]]

        # Guardar y cargar CSV
        buffer = BytesIO()
        df_filtered.to_csv(buffer, index=False)
        buffer.seek(0)
        destination_blob_name = 'STAGED/Yelp/business_filtered_FL.csv'
        destination_blob = storage_client.bucket(destination_bucket_name).blob(destination_blob_name)
        destination_blob.upload_from_file(buffer, content_type='text/csv')
        print(f"Datos de pizzerías, Pizza Hut y Taco Bell en Florida cargados exitosamente a {destination_blob_name} en el bucket {destination_bucket_name}.")
        
    except Exception as e:
        print(f"An error occurred: {str(e)}")

# Uso de variables de entorno para definir los parámetros
source_bucket_name = os.getenv('SOURCE_BUCKET_NAME', 'henry-pizza')
source_blob_name = os.getenv('SOURCE_BLOB_NAME', 'RAW/Yelp/business.pkl')
destination_bucket_name = os.getenv('DESTINATION_BUCKET_NAME', 'henry-pizza-staged')

# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def business_etl(cloud_event):
    data = cloud_event.data

    event_id = cloud_event["id"]
    event_type = cloud_event["type"]

    bucket = data["bucket"]
    name = data["name"]
    metageneration = data["metageneration"]
    timeCreated = data["timeCreated"]
    updated = data["updated"]

    print(f"Event ID: {event_id}")
    print(f"Event type: {event_type}")
    print(f"Bucket: {bucket}")
    print(f"File: {name}")
    print(f"Metageneration: {metageneration}")
    print(f"Created: {timeCreated}")
    print(f"Updated: {updated}")

    # Verificar si el archivo que disparó el evento es el archivo correcto
    if name == 'RAW/Yelp/business.pkl':
        print("Archivo correcto detectado para procesamiento.")
        process_and_upload_data(source_bucket_name, source_blob_name, destination_bucket_name)
    else:
        print(f"Archivo {name} no es business.pkl. Procesamiento no necesario.")


### ✅Etl de Yelp_checkin

In [None]:
import functions_framework
import pandas as pd
from google.cloud import storage
import os
from io import BytesIO

# Inicializa el cliente de Google Cloud Storage
storage_client = storage.Client()

def validate_data(df, table_name):
    """
    Función para validar los datos antes de continuar con el procesamiento.
    """
    if df.isnull().values.any():
        print(f"Warning: {table_name} contiene valores nulos.")
        df = df.dropna()
    if df.duplicated().any():
        print(f"Warning: {table_name} contiene duplicados.")
        df = df.drop_duplicates()
    return df

def get_business_ids_from_filtered_file(bucket_name, blob_name):
    """
    Función para obtener los IDs de negocios filtrados desde un archivo CSV en Google Cloud Storage.
    """
    try:
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        if not blob.exists():
            raise FileNotFoundError(f"El archivo {blob_name} no existe en el bucket {bucket_name}")
        
        csv_data = blob.download_as_bytes()
        temp_file = BytesIO(csv_data)
        df_filtered_business = pd.read_csv(temp_file)
        return df_filtered_business['business_id'].unique()
    except Exception as e:
        print(f"An error occurred while fetching business IDs: {str(e)}")
        return []

def process_and_upload_data(bucket_name, source_blob_name, destination_bucket_name, business_ids):
    try:
        print(f"Procesando archivo desde {bucket_name}/{source_blob_name}")

        # Descargar el archivo
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(source_blob_name)
        json_data = blob.download_as_bytes()
        print(f"Archivo {source_blob_name} descargado del bucket {bucket_name}")

        # Verificar si los datos están vacíos
        if not json_data:
            raise ValueError("El archivo JSON está vacío.")

        # Intentar cargar los datos en un DataFrame usando diferentes codificaciones
        temp_file = BytesIO(json_data)
        try:
            df_checkin = pd.read_json(temp_file, lines=True)
        except ValueError as e:
            print(f"Error al leer el archivo JSON: {e}")
            raise 
        except UnicodeDecodeError as e: 
            print(f"Error de decodificación: {e}")
            raise

        print(f"Archivo {source_blob_name} cargado en DataFrame")

        # Asegurarse de que la columna "date" es de tipo string
        df_checkin["date"] = df_checkin["date"].astype(str)

        # Procesamiento específico
        df_checkin["date"] = df_checkin["date"].str.split(',')
        df_checkin = df_checkin.explode("date").reset_index(drop=True)
        df_checkin["date"] = df_checkin["date"].str.strip()

        # Intentar convertir a datetime con un formato explícito
        date_formats = ["%Y-%m-%d %H:%M:%S", "%Y-%m-%d"]  # Lista de formatos esperados
        for fmt in date_formats:
            try:
                df_checkin["date"] = pd.to_datetime(df_checkin["date"], format=fmt, errors='raise')
                break
            except ValueError:
                continue
        else:
            df_checkin["date"] = pd.to_datetime(df_checkin["date"], errors='coerce')

        # Filtrar por business_id de pizzerías en Florida y Taco Bell
        df_checkin_filtered = df_checkin[df_checkin["business_id"].isin(business_ids)]
        print(f"Datos filtrados. Dimensiones del dataframe: {df_checkin_filtered.shape}")

        # Validar y convertir tipos de columnas
        df_checkin_filtered = validate_data(df_checkin_filtered, 'checkin')

        # Guardar y cargar CSV
        buffer = BytesIO()
        df_checkin_filtered.to_csv(buffer, index=False)
        buffer.seek(0)
        destination_blob_name = source_blob_name.replace('RAW', 'STAGED').replace('.json', '_etl.csv')
        destination_blob = storage_client.bucket(destination_bucket_name).blob(destination_blob_name)
        destination_blob.upload_from_file(buffer, content_type='text/csv')
        print(f"Datos filtrados cargados exitosamente a {destination_blob_name} en el bucket {destination_bucket_name}")
    except Exception as e:
        print(f"An error occurred: {str(e)}")

# Uso de variables de entorno para definir los parámetros
source_bucket_name = os.getenv('SOURCE_BUCKET_NAME', 'henry-pizza')
source_blob_name = os.getenv('SOURCE_BLOB_NAME', 'RAW/Yelp/checkin.json')
destination_bucket_name = os.getenv('DESTINATION_BUCKET_NAME', 'henry-pizza-staged')
filtered_business_blob_name = 'STAGED/Yelp/business_filtered_FL.csv'

@functions_framework.cloud_event
def checkin_etl(cloud_event):
    try:
        data = cloud_event.data

        event_id = cloud_event.get("id")
        event_type = cloud_event.get("type")

        bucket = data["bucket"]
        name = data["name"]
        metageneration = data["metageneration"]
        timeCreated = data["timeCreated"]
        updated = data["updated"]

        print(f"Event ID: {event_id}")
        print(f"Event type: {event_type}")
        print(f"Bucket: {bucket}")
        print(f"File: {name}")
        print(f"Metageneration: {metageneration}")
        print(f"Created: {timeCreated}")
        print(f"Updated: {updated}")

        # Obtener business_ids de pizzerías en Florida y Taco Bell
        business_ids = get_business_ids_from_filtered_file(destination_bucket_name, filtered_business_blob_name)
        print(f"Total de Business IDs obtenidos: {len(business_ids)}")

        # Procesar y subir los datos si el archivo es checkin.json
        if name == 'RAW/Yelp/checkin.json':
            print("Archivo correcto detectado para procesamiento.")
            process_and_upload_data(source_bucket_name, name, destination_bucket_name, business_ids)
        else:
            print(f"Archivo {name} no es relevante para este procesamiento. Procesamiento no necesario.")
    except Exception as e:
        print(f"An error occurred: {str(e)}")


### ✅Etl de tips

In [None]:
import functions_framework
import pandas as pd
from google.cloud import storage
import os
from io import BytesIO
import csv

# Inicializa el cliente de Google Cloud Storage
storage_client = storage.Client()

def get_business_ids_from_filtered_file(bucket_name, blob_name):
    try:
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        if not blob.exists():
            raise FileNotFoundError(f"El archivo {blob_name} no existe en el bucket {bucket_name}")
        
        csv_data = blob.download_as_bytes()
        temp_file = BytesIO(csv_data)
        df_filtered_business = pd.read_csv(temp_file)
        return df_filtered_business['business_id'].unique()
    except Exception as e:
        print(f"An error occurred while fetching business IDs: {str(e)}")
        return []

def clean_text(text):
    """Clean text to ensure proper CSV formatting"""
    if isinstance(text, str):
        text = text.replace('"', '""')  # Escapar comillas dobles
        text = text.replace('\n', ' ')  # Eliminar saltos de línea
        text = text.replace('\r', ' ')  # Eliminar retornos de carro
    return text

def process_and_upload_data(bucket_name, source_blob_name, destination_bucket_name, business_ids):
    try:
        print(f"Procesando archivo desde {bucket_name}/{source_blob_name} y subiendo a {destination_bucket_name}")

        # Descargar el archivo
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(source_blob_name)
        json_data = blob.download_as_bytes()
        print(f"Archivo {source_blob_name} descargado del bucket {bucket_name}")

        # Verificar si los datos están vacíos
        if not json_data:
            raise ValueError("El archivo JSON está vacío.")

        # Cargar los datos en un DataFrame
        temp_file = BytesIO(json_data)
        try:
            df_tip = pd.read_json(temp_file, lines=True)
            print(f"Archivo JSON cargado en el DataFrame.")
        except ValueError as ve:
            raise ValueError(f"Error al leer el archivo JSON: {ve}")

        # Filtrar por business_id de pizzerías en Florida y Taco Bell
        df_tip_filtered = df_tip[df_tip["business_id"].isin(business_ids)]
        print(f"Datos filtrados. Dimensiones del dataframe: {df_tip_filtered.shape}")

        # Procesamiento específico
        df_tip_filtered = df_tip_filtered.dropna()
        df_tip_filtered = df_tip_filtered.drop_duplicates()

        # Limpiar texto para asegurar formato adecuado de CSV
        df_tip_filtered['text'] = df_tip_filtered['text'].apply(clean_text)

        # Convertir la columna de fecha a string en formato de fecha
        df_tip_filtered['date'] = pd.to_datetime(df_tip_filtered['date']).dt.date

        # Agregar columna tip_id como un índice
        #df_tip_filtered['tip_id'] = df_tip_filtered.index.astype(str)

        # Asegurarse de que todas las columnas tengan el tipo de datos correcto
        df_tip_filtered['user_id'] = df_tip_filtered['user_id'].astype(str)
        df_tip_filtered['business_id'] = df_tip_filtered['business_id'].astype(str)
        df_tip_filtered['text'] = df_tip_filtered['text'].astype(str)
        #df_tip_filtered['tip_id'] = df_tip_filtered['tip_id'].astype(str)
        df_tip_filtered['date'] = df_tip_filtered['date'].astype(str)  # Convertir fecha a string

        # Guardar y cargar CSV
        buffer = BytesIO()
        df_tip_filtered.to_csv(buffer, index=False, quoting=csv.QUOTE_NONNUMERIC)
        buffer.seek(0)
        destination_blob_name = source_blob_name.replace('RAW', 'STAGED').replace('.json', '_etl.csv')
        destination_blob = storage_client.bucket(destination_bucket_name).blob(destination_blob_name)
        destination_blob.upload_from_file(buffer, content_type='text/csv')
        print(f"Datos cargados exitosamente a {destination_blob_name} en el bucket {destination_bucket_name}")
    except Exception as e:
        print(f"An error occurred: {str(e)}")

# Uso de variables de entorno para definir los parámetros
source_bucket_name = os.getenv('SOURCE_BUCKET_NAME', 'henry-pizza')
source_blob_name = os.getenv('SOURCE_BLOB_NAME', 'RAW/Yelp/tip.json')
destination_bucket_name = os.getenv('DESTINATION_BUCKET_NAME', 'henry-pizza-staged')
filtered_business_blob_name = 'STAGED/Yelp/business_filtered_FL.csv'

@functions_framework.cloud_event
def etl_tip(cloud_event):
    try:
        data = cloud_event.data

        event_id = cloud_event.get("id")
        event_type = cloud_event.get("type")

        bucket = data["bucket"]
        name = data["name"]
        metageneration = data["metageneration"]
        timeCreated = data.get("timeCreated")
        updated = data.get("updated")

        print(f"Event ID: {event_id}")
        print(f"Event type: {event_type}")
        print(f"Bucket: {bucket}")
        print(f"File: {name}")
        print(f"Metageneration: {metageneration}")
        print(f"Created: {timeCreated}")
        print(f"Updated: {updated}")

        # Obtener business_ids de pizzerías en Florida y Taco Bell
        business_ids = get_business_ids_from_filtered_file(destination_bucket_name, filtered_business_blob_name)
        print(f"Total de Business IDs obtenidos: {len(business_ids)}")

        # Procesar y subir los datos si el archivo es tip.json
        if name == 'RAW/Yelp/tip.json':
            print("Archivo correcto detectado para procesamiento.")
            process_and_upload_data(source_bucket_name, name, destination_bucket_name, business_ids)
        else:
            print(f"Archivo {name} no es relevante para este procesamiento. Procesamiento no necesario.")
    except Exception as e:
        print(f"An error occurred: {str(e)}")


### ✅Etl de Yelp_review

In [None]:
import functions_framework
import pandas as pd
from google.cloud import storage
import os
from io import BytesIO, StringIO
import json

# Inicializa el cliente de Google Cloud Storage
storage_client = storage.Client()

def get_business_ids_from_filtered_file(bucket_name, blob_name):
    """
    Obtiene los IDs de negocios filtrados desde un archivo CSV en Google Cloud Storage.
    """
    try:
        # Accede al bucket y al blob (archivo) especificado
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(blob_name)
        
        # Verifica si el archivo existe
        if not blob.exists():
            raise FileNotFoundError(f"El archivo {blob_name} no existe en el bucket {bucket_name}")
        
        # Descarga y lee el archivo CSV en un DataFrame
        csv_data = blob.download_as_bytes()
        temp_file = BytesIO(csv_data)
        df_filtered_business = pd.read_csv(temp_file)
        
        # Retorna los IDs únicos de los negocios
        return df_filtered_business['business_id'].unique()
    except Exception as e:
        print(f"An error occurred while fetching business IDs: {str(e)}")
        return []

def save_filtered_data(filtered_reviews, source_blob_name, destination_bucket_name, chunk_index):
    """
    Guarda los datos filtrados en un archivo CSV.
    """
    try:
        # Crea un DataFrame con los datos filtrados
        df_filtered = pd.DataFrame(filtered_reviews)
        
        # Valida y convierte los tipos de datos
        df_filtered['review_id'] = df_filtered['review_id'].astype(str)
        df_filtered['user_id'] = df_filtered['user_id'].astype(str)
        df_filtered['business_id'] = df_filtered['business_id'].astype(str)
        df_filtered['stars'] = df_filtered['stars'].astype(float)
        df_filtered['date'] = pd.to_datetime(df_filtered['date']).dt.date
        df_filtered['text'] = df_filtered['text'].astype(str)
        df_filtered['useful'] = df_filtered['useful'].astype(int)
        df_filtered['funny'] = df_filtered['funny'].astype(int)
        df_filtered['cool'] = df_filtered['cool'].astype(int)

        # Guarda los datos filtrados en un archivo CSV
        destination_blob_name = source_blob_name.replace('RAW', 'STAGED').replace('.json', f'_etl_part_{chunk_index}.csv')
        buffer = StringIO()
        df_filtered.to_csv(buffer, index=False)
        buffer.seek(0)
        destination_bucket = storage_client.bucket(destination_bucket_name)
        destination_blob = destination_bucket.blob(destination_blob_name)
        destination_blob.upload_from_string(buffer.getvalue(), content_type='text/csv')
        print(f"Datos filtrados cargados exitosamente a {destination_blob_name} en el bucket {destination_bucket_name}")
    except Exception as e:
        print(f"An error occurred while saving filtered data: {str(e)}")

def process_and_upload_data(bucket_name, source_blob_name, destination_bucket_name, business_ids, chunk_size=5000):
    """
    Procesa y carga datos en chunks para reducir el uso de memoria.
    """
    try:
        print(f"Procesando archivo desde {bucket_name}/{source_blob_name}")

        # Accede al bucket y al blob (archivo) especificado
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(source_blob_name)

        # Descarga el contenido del blob línea por línea
        with blob.open("r") as blob_reader:
            filtered_reviews = []
            chunk_index = 1

            for line in blob_reader:
                review = json.loads(line)
                if review['business_id'] in business_ids:
                    filtered_reviews.append(review)
                if len(filtered_reviews) >= chunk_size:
                    save_filtered_data(filtered_reviews, source_blob_name, destination_bucket_name, chunk_index)
                    filtered_reviews = []  # Limpia la lista para liberar memoria
                    chunk_index += 1

            # Guarda cualquier resto de datos filtrados
            if filtered_reviews:
                save_filtered_data(filtered_reviews, source_blob_name, destination_bucket_name, chunk_index)
                #Se cargó exitosamente
        
    except Exception as e:
        print(f"An error occurred while processing data: {str(e)}")

# Uso de variables de entorno para definir los parámetros
source_bucket_name = os.getenv('SOURCE_BUCKET_NAME', 'henry-pizza')
source_blob_name = os.getenv('SOURCE_BLOB_NAME', 'RAW/Yelp/review.json')
destination_bucket_name = os.getenv('DESTINATION_BUCKET_NAME', 'henry-pizza-staged')
filtered_business_blob_name = 'STAGED/Yelp/business_filtered_FL.csv'

@functions_framework.cloud_event
def etl_review(cloud_event):
    """
    Cloud Function para procesar y cargar datos de reviews.
    """
    try:
        data = cloud_event.data

        event_id = cloud_event.get("id")
        event_type = cloud_event.get("type")

        bucket = data["bucket"]
        name = data["name"]
        metageneration = data["metageneration"]
        timeCreated = data["timeCreated"]
        updated = data["updated"]

        print(f"Event ID: {event_id}")
        print(f"Event type: {event_type}")
        print(f"Bucket: {bucket}")
        print(f"File: {name}")
        print(f"Metageneration: {metageneration}")
        print(f"Created: {timeCreated}")
        print(f"Updated: {updated}")

        # Obtener business_ids de pizzerías en Florida y Taco Bell
        business_ids = get_business_ids_from_filtered_file(destination_bucket_name, filtered_business_blob_name)
        print(f"Total de Business IDs obtenidos: {len(business_ids)}")

        # Verificar si el archivo que disparó el evento es el archivo correcto
        if name == 'RAW/Yelp/review.json':
            print("Archivo correcto detectado para procesamiento.")
            # Procesa y sube los datos en un solo archivo CSV
            process_and_upload_data(source_bucket_name, source_blob_name, destination_bucket_name, business_ids)
        else:
            print(f"Archivo {name} no es relevante para este procesamiento. Procesamiento no necesario.")
    except Exception as e:
        print(f"An error occurred: {str(e)}")


### ✅Etl de Yelp_user

In [None]:
import functions_framework
import pandas as pd
from google.cloud import storage
import os
from io import BytesIO

# Inicializa el cliente de Google Cloud Storage
storage_client = storage.Client()

def validate_data(df, table_name):
    """
    Función para validar los datos antes de continuar con el procesamiento.
    """
    if df.isnull().values.any():
        print(f"Warning: {table_name} contiene valores nulos.")
        df = df.dropna()
    if df.duplicated().any():
        print(f"Warning: {table_name} contiene duplicados.")
        df = df.drop_duplicates()
    return df

def convert_column_types(df):
    """
    Convierte los tipos de columna a los tipos adecuados para su posterior uso.
    """
    df['user_id'] = df['user_id'].astype(str)
    df['name'] = df['name'].astype(str)
    df['review_count'] = df['review_count'].astype(int)
    df['yelping_since'] = pd.to_datetime(df['yelping_since'], errors='coerce')
    df['useful'] = df['useful'].astype(int)
    df['funny'] = df['funny'].astype(int)
    df['cool'] = df['cool'].astype(int)
    df['fans'] = df['fans'].astype(int)
    df['elite'] = df['elite'].astype(str)
    df['average_stars'] = df['average_stars'].astype(float)
    df['compliment_hot'] = df['compliment_hot'].astype(int)
    df['compliment_more'] = df['compliment_more'].astype(int)
    df['compliment_profile'] = df['compliment_profile'].astype(int)
    df['compliment_cute'] = df['compliment_cute'].astype(int)
    df['compliment_list'] = df['compliment_list'].astype(int)
    df['compliment_note'] = df['compliment_note'].astype(int)
    df['compliment_plain'] = df['compliment_plain'].astype(int)
    df['compliment_cool'] = df['compliment_cool'].astype(int)
    df['compliment_funny'] = df['compliment_funny'].astype(int)
    df['compliment_writer'] = df['compliment_writer'].astype(int)
    df['compliment_photos'] = df['compliment_photos'].astype(int)
    return df

def process_and_upload_data(bucket_name, source_blob_name, destination_bucket_name, chunk_size=50000):
    try:
        print(f"Procesando archivo desde {bucket_name}/{source_blob_name}")

        # Descargar el archivo
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(source_blob_name)
        parquet_data = blob.download_as_bytes()
        print(f"Archivo {source_blob_name} descargado del bucket {bucket_name}")

        # Verificar si los datos están vacíos
        if not parquet_data:
            raise ValueError("El archivo Parquet está vacío.")

        # Cargar los datos en un DataFrame desde un archivo Parquet
        temp_file = BytesIO(parquet_data)
        df_user = pd.read_parquet(temp_file)
        print(f"Archivo {source_blob_name} cargado en DataFrame")
        print(f"Primeras filas del DataFrame original:\n{df_user.head()}")

        # Validar y convertir tipos de columnas
        df_user = validate_data(df_user, 'user')
        df_user = convert_column_types(df_user)

        # Guardar y cargar CSV
        buffer = BytesIO()
        df_user.to_csv(buffer, index=False)
        buffer.seek(0)
        destination_blob_name = source_blob_name.replace('RAW', 'STAGED').replace('.parquet', '_etl.csv')
        destination_blob = storage_client.bucket(destination_bucket_name).blob(destination_blob_name)
        destination_blob.upload_from_file(buffer, content_type='text/csv')
        print(f"Datos cargados exitosamente a {destination_blob_name} en el bucket {destination_bucket_name}")
    except Exception as e:
        print(f"An error occurred: {str(e)}")

# Uso de variables de entorno para definir los parámetros
source_bucket_name = os.getenv('SOURCE_BUCKET_NAME', 'henry-pizza')
source_blob_name = os.getenv('SOURCE_BLOB_NAME', 'RAW/Yelp/user.parquet')
destination_bucket_name = os.getenv('DESTINATION_BUCKET_NAME', 'henry-pizza-staged')

@functions_framework.cloud_event
def etl_user(cloud_event):
    try:
        data = cloud_event.data

        event_id = cloud_event.get("id")
        event_type = cloud_event.get("type")

        bucket = data["bucket"]
        name = data["name"]
        metageneration = data["metageneration"]
        timeCreated = data["timeCreated"]
        updated = data["updated"]

        print(f"Event ID: {event_id}")
        print(f"Event type: {event_type}")
        print(f"Bucket: {bucket}")
        print(f"File: {name}")
        print(f"Metageneration: {metageneration}")
        print(f"Created: {timeCreated}")
        print(f"Updated: {updated}")

        # Procesar y subir los datos si el archivo es user.parquet
        if name == source_blob_name:
            print("Archivo correcto detectado para procesamiento.")
            process_and_upload_data(source_bucket_name, name, destination_bucket_name)
        else:
            print(f"Archivo {name} no es relevante para este procesamiento. Procesamiento no necesario.")
    except Exception as e:
        print(f"An error occurred: {str(e)}")


## Google maps🗺️

### ✅Etl de Google_review-florida

In [None]:
import functions_framework
import pandas as pd
from google.cloud import storage
import os
from io import BytesIO, StringIO
import csv
import re

# Inicializa el cliente de Google Cloud Storage
storage_client = storage.Client()

def clean_text(text):
    """
    Función para limpiar y formatear correctamente el texto, manejando comillas y caracteres especiales.
    """
    if pd.isna(text):
        return ''
    text = text.replace('"', '""')  # Escapar comillas dobles
    text = text.replace('\n', ' ')  # Reemplazar saltos de línea por espacio
    return text

def process_and_upload_data(bucket_name, file_name, destination_bucket_name):
    try:
        print(f"Procesando archivo desde {bucket_name}/{file_name}")

        # Descargar el archivo
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(file_name)
        json_data = blob.download_as_bytes()
        print(f"Archivo {file_name} descargado del bucket {bucket_name}")

        # Verificar si los datos están vacíos
        if not json_data:
            raise ValueError("El archivo JSON está vacío.")

        # Cargar los datos en un DataFrame
        temp_file = BytesIO(json_data)
        df_reviews = pd.read_json(temp_file, lines=True, dtype={"user_id": str})

        print(f"Archivo {file_name} cargado en DataFrame")

        # Eliminar las columnas "pics" y "resp"
        df_reviews = df_reviews.drop(columns=['pics', 'resp'], errors='ignore')

        # Limpiar el texto para asegurar que las comillas estén correctamente manejadas
        df_reviews['text'] = df_reviews['text'].apply(clean_text)

        # Extraer el número del nombre del archivo original
        match = re.search(r'(\d+)', file_name)
        if match:
            number = match.group(1)
            destination_blob_name = f'STAGED/Google/review-Florida/review-Florida{number}.csv'
        else:
            destination_blob_name = 'STAGED/Google/review-Florida.csv'

        # Guardar los datos filtrados en un solo archivo CSV
        buffer = StringIO()
        df_reviews.to_csv(buffer, index=False, quoting=csv.QUOTE_ALL, escapechar='\\', doublequote=True)
        buffer.seek(0)
        destination_bucket = storage_client.bucket(destination_bucket_name)
        destination_blob = destination_bucket.blob(destination_blob_name)
        destination_blob.upload_from_string(buffer.getvalue(), content_type='text/csv')
        print(f"Datos cargados exitosamente a {destination_blob_name} en el bucket {destination_bucket_name}")
    except Exception as e:
        print(f"An error occurred: {str(e)}")

# Uso de variables de entorno para definir los parámetros
source_bucket_name = os.getenv('SOURCE_BUCKET_NAME', 'henry-pizza')
destination_bucket_name = os.getenv('DESTINATION_BUCKET_NAME', 'henry-pizza-staged')

# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def etl_google_review(cloud_event):
    try:
        data = cloud_event.data

        event_id = cloud_event.get("id")
        event_type = cloud_event.get("type")

        bucket = data["bucket"]
        name = data["name"]
        metageneration = data["metageneration"]
        timeCreated = data["timeCreated"]
        updated = data["updated"]

        print(f"Event ID: {event_id}")
        print(f"Event type: {event_type}")
        print(f"Bucket: {bucket}")
        print(f"File: {name}")
        print(f"Metageneration: {metageneration}")
        print(f"Created: {timeCreated}")
        print(f"Updated: {updated}")

        # Verificar si el archivo que disparó el evento es el archivo correcto
        if re.search(r'RAW/Google/review[-_]Florida', name):
            print("Archivo correcto detectado para procesamiento.")
            process_and_upload_data(source_bucket_name, name, destination_bucket_name)
        else:
            print(f"Archivo {name} no es relevante para este procesamiento. Procesamiento no necesario.")
    except Exception as e:
        print(f"An error occurred: {str(e)}")

### ✅Etl de Google_metadata-sitios

In [None]:
import functions_framework
import pandas as pd
from google.cloud import storage
import os
from io import BytesIO
import re

# Inicializa el cliente de Google Cloud Storage
storage_client = storage.Client()

def process_and_upload_data(bucket_name, file_name, destination_bucket_name):
    try:
        print(f"Procesando archivo desde {bucket_name}/{file_name}")

        # Descargar el archivo
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(file_name)
        json_data = blob.download_as_bytes()
        print(f"Archivo {file_name} descargado del bucket {bucket_name}")

        # Verificar si los datos están vacíos
        if not json_data:
            raise ValueError("El archivo JSON está vacío.")

        # Cargar los datos en un DataFrame
        temp_file = BytesIO(json_data)
        df_metadata = pd.read_json(temp_file, lines=True)
        print(f"Archivo {file_name} cargado en DataFrame")
        df_metadata = df_metadata.drop(columns=['price','hours','MISC','state','relative_results','url']) 
        # Filtros geográficos y categorías
        latitud_max, latitud_min = 31.0, 24.5
        longitud_max, longitud_min = -80.0, -87.6
        mascara_latitud = (df_metadata['latitude'] >= latitud_min) & (df_metadata['latitude'] <= latitud_max)
        mascara_longitud = (df_metadata['longitude'] >= longitud_min) & (df_metadata['longitude'] <= longitud_max)
        df_filtered_FL = df_metadata[mascara_latitud & mascara_longitud].reset_index(drop=True)

        # Reemplazar None por cadenas vacías en las columnas que se utilizarán para filtrar
        df_filtered_FL['category'] = df_filtered_FL['category'].fillna('')
        df_filtered_FL['name'] = df_filtered_FL['name'].fillna('')

        # Filtrar pizzerías y Taco Bell
        pizza_pattern = re.compile(r'pizza|pizzería', re.IGNORECASE)
        taco_bell_pattern = re.compile(r'taco\s*bell', re.IGNORECASE)

        df_filtered = df_filtered_FL[
            df_filtered_FL['category'].str.contains(pizza_pattern, na=False) | 
            df_filtered_FL['name'].apply(lambda x: bool(pizza_pattern.search(x))) |
            df_filtered_FL['name'].apply(lambda x: bool(taco_bell_pattern.search(x)))
        ]

        # Convertir listas a cadenas de texto y manejar valores None
        df_filtered.loc[:, 'category'] = df_filtered['category'].apply(lambda x: ', '.join(x) if isinstance(x, list) else (x if x is not None else ''))
#         df_filtered.loc[:, 'hours'] = df_filtered['hours'].apply(lambda x: ', '.join([f"{day}: {hours}" for day, hours in x.items()]) if isinstance(x, dict) else (x if x is not None else '')) 
        # Guardar y cargar CSV con el mismo nombre en el bucket de destino
        destination_blob_name = file_name.replace('RAW', 'STAGED').replace('.json', 'metadata-FL.csv')
        buffer = BytesIO()
        df_filtered.to_csv(buffer, index=False)
        buffer.seek(0)
        destination_bucket = storage_client.bucket(destination_bucket_name)
        destination_blob = destination_bucket.blob(destination_blob_name)
        destination_blob.upload_from_file(buffer, content_type='text/csv')
        print(f"Datos cargados exitosamente a {destination_blob_name} en el bucket {destination_bucket_name}")
    except Exception as e:
        print(f"An error occurred: {str(e)}")

# Uso de variables de entorno para definir los parámetros
source_bucket_name = os.getenv('SOURCE_BUCKET_NAME', 'henry-pizza')
destination_bucket_name = os.getenv('DESTINATION_BUCKET_NAME', 'henry-pizza-staged')

# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def etl_google_metadata(cloud_event):
    try:
        data = cloud_event.data

        event_id = cloud_event.get("id")
        event_type = cloud_event.get("type")

        bucket = data.get("bucket")
        name = data.get("name")
        metageneration = data.get("metageneration")
        timeCreated = data.get("timeCreated")
        updated = data.get("updated")

        print(f"Event ID: {event_id}")
        print(f"Event type: {event_type}")
        print(f"Bucket: {bucket}")
        print(f"File: {name}")
        print(f"Metageneration: {metageneration}")
        print(f"Created: {timeCreated}")
        print(f"Updated: {updated}")

        # Verificar si el archivo que disparó el evento es el archivo correcto
        if name.startswith('RAW/Google/metadata-sitios'):
            print("Archivo correcto detectado para procesamiento.")
            process_and_upload_data(source_bucket_name, name, destination_bucket_name)
        else:
            print(f"Archivo {name} no es relevante para este procesamiento. Procesamiento no necesario.")
    except Exception as e:
        print(f"An error occurred: {str(e)}")


# 📊Validación de datos y carga a BigQuery

- En esta etapa, nos encargamos de validar los datos mediante la creación de una `función` para normalizar los datos y una `tabla` en la que se hará un registro de todos los archivos que se subieron.
- Tambièn se comentò cada etapa para poder visualizar el progreso en el servicio Loggins de Google Cloud Platform.
- La verificación y validación de los datos evita el procesamiento innecesario de archivos no relevantes, esto, asegurando la ` la Integridad de los datos `

In [None]:
import functions_framework
import pandas as pd
from google.cloud import storage, bigquery
import os
from io import BytesIO, StringIO
from datetime import datetime

# Inicializa el cliente de Google Cloud Storage y BigQuery
storage_client = storage.Client()
bigquery_client = bigquery.Client()

def validate_data(df, table_name):
    """
    Función para validar los datos antes de cargarlos a BigQuery.
    """
    if df.isnull().values.any():
        print(f"Warning: {table_name} contiene valores nulos.")
        df = df.dropna()
    if df.duplicated().any():
        print(f"Warning: {table_name} contiene duplicados.")
        df = df.drop_duplicates()
    return df

def convert_column_types(df, table_name):
    """
    Convierte las columnas a los tipos correctos según el esquema de BigQuery.
    """
    if table_name == 'users':
        df['user_id'] = df['user_id'].astype(str)
        df['name'] = df['name'].astype(str)
        df['review_count'] = df['review_count'].astype(int)
        df['yelping_since'] = pd.to_datetime(df['yelping_since'], errors='coerce').dt.date
        df['useful'] = df['useful'].astype(int)
        df['funny'] = df['funny'].astype(int)
        df['cool'] = df['cool'].astype(int)
        df['fans'] = df['fans'].astype(int)
        df['elite'] = df['elite'].astype(str)
        df['average_stars'] = df['average_stars'].astype(float)
        df['compliment_hot'] = df['compliment_hot'].astype(int)
        df['compliment_more'] = df['compliment_more'].astype(int)
        df['compliment_profile'] = df['compliment_profile'].astype(int)
        df['compliment_cute'] = df['compliment_cute'].astype(int)
        df['compliment_list'] = df['compliment_list'].astype(int)
        df['compliment_note'] = df['compliment_note'].astype(int)
        df['compliment_plain'] = df['compliment_plain'].astype(int)
        df['compliment_cool'] = df['compliment_cool'].astype(int)
        df['compliment_funny'] = df['compliment_funny'].astype(int)
        df['compliment_writer'] = df['compliment_writer'].astype(int)
        df['compliment_photos'] = df['compliment_photos'].astype(int)
    elif table_name == 'businesses':
        df['business_id'] = df['business_id'].astype(str)
        df['name'] = df['name'].astype(str)
        df['address'] = df['address'].astype(str)
        df['city'] = df['city'].astype(str)
        df['latitude'] = df['latitude'].astype(float)
        df['longitude'] = df['longitude'].astype(float)
        df['stars'] = df['stars'].astype(float)
        df['review_count'] = df['review_count'].astype(int)
        df['categories'] = df['categories'].astype(str)
    elif table_name == 'reviews':
        df['review_id'] = df['review_id'].astype(str)
        df['user_id'] = df['user_id'].astype(str)
        df['business_id'] = df['business_id'].astype(str)
        df['stars'] = df['stars'].astype(float)
        df['useful'] = df['useful'].astype(int)
        df['funny'] = df['funny'].astype(int)
        df['cool'] = df['cool'].astype(int)
        df['text'] = df['text'].astype(str)
        df['date'] = pd.to_datetime(df['date'], errors='coerce').dt.date
    elif table_name == 'checkins':
        df['business_id'] = df['business_id'].astype(str)
        df['date'] = pd.to_datetime(df['date'], errors='coerce')
    elif table_name == 'tips':
        df['user_id'] = df['user_id'].astype(str)
        df['business_id'] = df['business_id'].astype(str)
        df['text'] = df['text'].astype(str)
        df['date'] = pd.to_datetime(df['date'], errors='coerce')
        df['compliment_count'] = df['compliment_count'].astype(int)
    elif table_name == 'processed_files':
        df['file_name'] = df['file_name'].astype(str)
        df['load_timestamp'] = pd.to_datetime(df['load_timestamp'], errors='coerce')
        df['file_size'] = df['file_size'].astype(int)
        df['status'] = df['status'].astype(str)
    elif table_name == 'metadata_sitios':
        df['name'] = df['name'].astype(str)
        df['address'] = df['address'].astype(str)
        df['gmap_id'] = df['gmap_id'].astype(str)
        df['description'] = df['description'].astype(str)
        df['latitude'] = pd.to_numeric(df['latitude'])
        df['longitude'] = pd.to_numeric(df['longitude'])
        df['category'] = df['category'].astype(str)
        df['avg_rating'] = pd.to_numeric(df['avg_rating'])
        df['num_of_reviews'] = df['num_of_reviews'].astype(int)
    elif table_name == 'review_florida':
        df['user_id'] = df['user_id'].astype(str)    
        df['name'] = df['name'].astype(str)    
        df['time'] = pd.to_datetime(df['time'])    
        df['rating'] = df['rating'].astype(float)    
        df['text'] = df['text'].astype(str)
        df['gmap_id'] = df['gmap_id'].astype(str)    

    return df

def filter_columns(df, table_name):
    """
    Filtra las columnas del DataFrame según el esquema de BigQuery.
    """
    if table_name == 'users':
        expected_columns = ['user_id', 'name', 'review_count', 'yelping_since', 'useful', 'funny', 'cool', 'fans', 'elite', 'average_stars', 'compliment_hot', 'compliment_more', 'compliment_profile', 'compliment_cute', 'compliment_list', 'compliment_note', 'compliment_plain', 'compliment_cool', 'compliment_funny', 'compliment_writer', 'compliment_photos']
    elif table_name == 'businesses':
        expected_columns = ['business_id', 'name', 'address', 'city', 'latitude', 'longitude', 'stars', 'review_count', 'categories']
    elif table_name == 'reviews':
        expected_columns = ['review_id', 'user_id', 'business_id', 'stars','useful', 'funny', 'cool','text','date']
    elif table_name == 'checkins':
        expected_columns = ['business_id', 'date']
    elif table_name == 'tips':
        expected_columns = ['user_id', 'business_id', 'text', 'date','compliment_count']
    elif table_name == 'processed_files':
        expected_columns = ['file_name', 'load_timestamp', 'file_size', 'status']
    elif table_name == 'metadata_sitios':
        expected_columns = ['name','address','gmap_id','description','latitude','longitude','category','avg_rating','num_of_reviews']
    elif table_name == 'review_florida':
        expected_columns = ['user_id','name','time','rating','text','gmap_id']      
    else:
        expected_columns = df.columns.tolist()

    return df[expected_columns]

def load_data_to_bigquery(df, table_name):
    """
    Función para cargar datos a BigQuery.
    """
    try:
        dataset_id = 'yelp_google_data'
        table_id = f'{dataset_id}.{table_name}'

        # Filtrar columnas que no están en el esquema de BigQuery
        df = filter_columns(df, table_name)

        job_config = bigquery.LoadJobConfig(
            write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
            autodetect=True
        )
        
        job = bigquery_client.load_table_from_dataframe(df, table_id, job_config=job_config)
        job.result()
        print(f"Datos cargados exitosamente a {table_name}")
    except Exception as e:
        print(f"An error occurred while loading data to BigQuery: {str(e)}")
        raise

def record_file_in_validation_table(file_name, file_size, status):
    """
    Función para registrar el archivo en la tabla de validación.
    """
    table_id = 'yelp_google_data.processed_files'
    load_timestamp = datetime.utcnow().isoformat() + 'Z'
    rows_to_insert = [
        {"file_name": file_name, "load_timestamp": load_timestamp, "file_size": file_size, "status": status}
    ]
    errors = bigquery_client.insert_rows_json(table_id, rows_to_insert)
    if errors:
        raise RuntimeError(f"Error inserting rows into validation table: {errors}")
    else:
        print(f"Archivo {file_name} registrado en la tabla de validación.")

def is_file_processed(file_name):
    """
    Función para verificar si el archivo ya ha sido procesado.
    """
    query = f"""
        SELECT COUNT(*) as file_count
        FROM `yelp_google_data.processed_files`
        WHERE file_name = '{file_name}'
    """
    result = bigquery_client.query(query).result().to_dataframe()
    return result['file_count'][0] > 0

def process_and_upload_data(bucket_name, source_blob_name, table_name):
    try:
        print(f"Procesando archivo desde {bucket_name}/{source_blob_name}")

        # Verificar si el archivo ya ha sido procesado
        if is_file_processed(source_blob_name):
            print(f"El archivo {source_blob_name} ya ha sido procesado. Procesamiento no necesario.")
            return

        # Descargar el archivo
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(source_blob_name)
        blob_data = blob.download_as_bytes()
        file_size = blob.size

        # Verificar si los datos están vacíos
        if not blob_data:
            raise ValueError("El archivo está vacío.")

        # Decodificar bytes a string
        blob_data_str = blob_data.decode('utf-8')

        # Procesar datos línea por línea para reducir el uso de memoria
        data_stream = StringIO(blob_data_str)
        df = pd.read_csv(data_stream)

        # Validar datos
        df = validate_data(df, table_name)

        # Convertir los tipos de columna según el esquema de BigQuery
        df = convert_column_types(df, table_name)

        # Depurar el DataFrame antes de cargarlo a BigQuery
        print(f"Esquema del DataFrame antes de cargar a BigQuery:\n{df.dtypes}")
        print(f"Primeras filas del DataFrame:\n{df.head()}")

        # Cargar datos a BigQuery
        load_data_to_bigquery(df, table_name)

        # Registrar archivo en tabla de validación
        record_file_in_validation_table(source_blob_name, file_size, 'SUCCESS')

    except Exception as e:
        print(f"An error occurred: {str(e)}")
        # Registrar archivo en tabla de validación con estado de error
        record_file_in_validation_table(source_blob_name, file_size, 'FAILED')
        raise

# Uso de variables de entorno para definir los parámetros
source_bucket_name = os.getenv('SOURCE_BUCKET_NAME', 'henry-pizza')

@functions_framework.cloud_event
def big_query(cloud_event):
    try:
        data = cloud_event.data
        bucket = data["bucket"]
        name = data["name"]

        print(f"Bucket: {bucket}")
        print(f"File: {name}")

        # Determinar la tabla de destino en función del archivo
        if 'user' in name:
            table_name = 'users'
        elif 'business' in name:
            table_name = 'businesses'
        elif 'review_etl' in name:
            table_name = 'reviews'
        elif 'checkin' in name:
            table_name = 'checkins'
        elif 'tip' in name:
            table_name = 'tips'
        elif 'review-Florida' in name:
            table_name = 'review_florida'  
        elif 'metadata-FL' in name:
            table_name = 'metadata_sitios'

        else:
            print(f"Archivo {name} no es relevante para este procesamiento.")
            return

        # Procesar y cargar datos
        process_and_upload_data(bucket, name, table_name)

    except Exception as e:
        print(f"An error occurred: {str(e)}")
