# FUNCIONES DE GOOGLE CLOUD FUNTIONS

### FUNCION DEL ETL 

In [None]:
import pandas as pd
import io
from google.cloud import storage

def process_csv(event, context):
    try:
        # Obtener detalles del archivo y los buckets
        raw_bucket_name = "carga_de_datos"  # Nombre del bucket de archivos crudos
        input_file = event['name']
        print(f"Archivo recibido para procesar: {input_file}")
        
        # Bucket donde se guardarán los archivos limpios
        clean_bucket_name = "datos_limpioss"  # Nombre del bucket de archivos limpios
        print(f"Bucket de origen: {raw_bucket_name}, Bucket de destino: {clean_bucket_name}")
        
        # Ignorar archivos procesados por seguridad
        if input_file.startswith("processed_"):
            print(f"Ignorando archivo ya procesado: {input_file}")
            return
        
        # Definir el nombre del archivo de salida procesado
        output_file = f"processed_{input_file}"
        print(f"Nombre del archivo de salida: {output_file}")

        # Inicializar cliente de Google Cloud Storage
        storage_client = storage.Client()
        print("Cliente de Google Cloud Storage inicializado")

        # Bucket crudo (origen)
        raw_bucket = storage_client.bucket(raw_bucket_name)
        
        # Bucket limpio (destino)
        clean_bucket = storage_client.bucket(clean_bucket_name)
        print("Buckets obtenidos correctamente")

        # Descargar el archivo CSV desde el bucket crudo
        blob = raw_bucket.blob(input_file)
        data = blob.download_as_bytes()
        print(f"Archivo {input_file} descargado desde el bucket crudo")
        
        # Comprobar si el archivo se descargó correctamente
        if not data:
            raise ValueError(f"El archivo {input_file} está vacío o no se pudo descargar correctamente")
        
        # Leer el contenido del archivo en un DataFrame
        df = pd.read_csv(io.BytesIO(data))
        print(f"Archivo CSV cargado en un DataFrame. Columnas: {df.columns.tolist()}")

        # Procesar el DataFrame
        df = procesar_dataframe(df)
        print(f"DataFrame procesado exitosamente")

        # Convertir el DataFrame a CSV y guardarlo en el bucket limpio
        output_blob = clean_bucket.blob(output_file)
        output_buffer = io.StringIO()
        df.to_csv(output_buffer, index=False)
        output_blob.upload_from_string(output_buffer.getvalue(), content_type='text/csv')
        print(f"Archivo procesado guardado en el bucket limpio como {output_file}")
    
    except Exception as e:
        print(f'Error procesando el archivo {input_file}: {str(e)}')


def procesar_dataframe(df):
    try:
        print("Iniciando procesamiento del DataFrame")
        
        # Tratar valores nulos: 'sin dato' para strings/objects, 0 para enteros y 0.0 para flotantes
        for col in df.columns:
            if df[col].dtype == 'int64':
                df[col].fillna(0, inplace=True)
                print(f"Valores nulos reemplazados por 0 en la columna {col}")
            elif df[col].dtype == 'float64':
                df[col].fillna(0.0, inplace=True)
                print(f"Valores nulos reemplazados por 0.0 en la columna {col}")
            elif df[col].dtype == 'object':
                df[col].fillna("sin dato", inplace=True)
                print(f"Valores nulos reemplazados por 'sin dato' en la columna {col}")

        # Limpiar espacios, caracteres especiales y normalizar texto en columnas de texto
        for col in df.select_dtypes(include=['object']).columns:
            df[col] = df[col].str.strip()
            df[col] = df[col].str.replace(r'\s+', ' ', regex=True)
            df[col] = df[col].str.replace(r'[^\w\s]', '', regex=True)
            df[col] = df[col].str.title()
            df[col].replace('', 'Sin Dato', inplace=True)
            print(f"Columna {col} limpiada y normalizada")

        # Convertir valores no numéricos en columnas float a 0.0
        for col in df.select_dtypes(include=['float64']).columns:
            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0.0)
            print(f"Valores no numéricos convertidos a 0.0 en la columna {col}")

        # Eliminar duplicados basados en todas las columnas
        df = df.drop_duplicates(keep='first')
        print("Duplicados eliminados del DataFrame")

        return df

    except Exception as e:
        print(f"Error al procesar el DataFrame: {str(e)}")
        raise e

### FUNCION QUE CARGA DATOS A BIGQUERY

In [None]:
import os
from google.cloud import storage
from google.cloud import bigquery

# Inicializar clientes de Cloud Storage y BigQuery
storage_client = storage.Client()
bigquery_client = bigquery.Client()

def process_file_to_bigquery(event, context):
    """Esta función se activa cuando un archivo es subido al bucket de Cloud Storage."""

    # Obtener el nombre del bucket y del archivo
    bucket_name = event['bucket']
    file_name = event['name']

    print(f"Archivo {file_name} detectado en el bucket {bucket_name}.")

    # Definir el mapeo entre el nombre del archivo y las tablas de BigQuery
    project_id = "canvas-sum-435121-c0"  # Asegúrate de usar el ID de proyecto correcto
    dataset_id = "Dataset_California"  # Nombre correcto del dataset

    table_mapping = {
        'Business Yelp': f'{project_id}.{dataset_id}.Business_Yelp',
        'Metadata Google': f'{project_id}.{dataset_id}.Metadata_Google',
        'Reviews Google': f'{project_id}.{dataset_id}.Reviews_Google',
        'Reviews Yelp': f'{project_id}.{dataset_id}.Reviews_Yelp',
        'Yelp Reviews California': f'{project_id}.{dataset_id}.Yelp_Reviews_California'  # Nueva tabla añadida
    }

    # Verificar si el nombre del archivo coincide con alguna de las tablas
    try:
        for key in table_mapping:
            if key in file_name:
                table_id = table_mapping[key]
                print(f"El archivo {file_name} será cargado en la tabla {table_id}.")
                
                # Intentar cargar el archivo en BigQuery
                load_file_to_bigquery(bucket_name, file_name, table_id)
                
                # Verificación de éxito
                print(f"El archivo {file_name} se cargó correctamente en la tabla {table_id}.")
                return
        
        # Si no se encuentra una tabla correspondiente
        print(f"No se encontró una tabla correspondiente para el archivo {file_name}.")
    
    except Exception as e:
        # Manejo de errores
        print(f"Error durante el procesamiento del archivo {file_name}: {e}")

def load_file_to_bigquery(bucket_name, file_name, table_id):
    """Función para cargar el archivo de Cloud Storage a BigQuery."""

    try:
        # Obtener referencia al bucket y archivo
        bucket = storage_client.get_bucket(bucket_name)
        blob = bucket.blob(file_name)
        
        # Descargar el archivo temporalmente en la instancia de Cloud Function
        temp_file = f"/tmp/{file_name}"
        blob.download_to_filename(temp_file)
        print(f"Archivo {file_name} descargado temporalmente a {temp_file}.")

        # Configurar el trabajo de carga de BigQuery
        job_config = bigquery.LoadJobConfig(
            source_format=bigquery.SourceFormat.CSV,
            skip_leading_rows=1,  # Asumimos que el archivo tiene encabezados
            autodetect=True,  # Detectar automáticamente el esquema
        )

        # Abrir el archivo CSV y cargarlo en BigQuery
        with open(temp_file, "rb") as source_file:
            job = bigquery_client.load_table_from_file(source_file, table_id, job_config=job_config)

        job.result()  # Esperar a que el job termine

        print(f"Archivo {file_name} cargado exitosamente en la tabla {table_id} de BigQuery.")
    
    except Exception as e:
        # Manejo de errores en el proceso de carga a BigQuery
        print(f"Error al cargar el archivo {file_name} a BigQuery: {e}")