In [2]:
import os
from google.cloud import storage
from datetime import datetime
import pandas as pd
import tempfile  # Para manejar carpetas temporales de forma multiplataforma

# Configurar las credenciales de Google Cloud
def configure_google_cloud_credentials(credential_path):
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credential_path
    print(f"Credenciales configuradas desde: {credential_path}")

# Configurar las credenciales de Google Cloud (Cambiar el path a la ubicación de tu archivo JSON de credenciales)
configure_google_cloud_credentials("data-avatar-435301-p6-45ed205c0371.json")

# Función para transformar la columna 'category'
def transform_category_column(df):
    if 'category' in df.columns:
        # Convertir 'category' en una cadena de texto separada por comas
        df['category'] = df['category'].apply(lambda x: ', '.join(x) if isinstance(x, list) else str(x))
        print("Columna 'category' transformada a una cadena de texto separada por comas.")

    return df

# Función para asignar la abreviatura del estado basado en la latitud y longitud
def assign_state(row):
    lat, lon = row['latitude'], row['longitude']
    
    # California
    if 32.5 <= lat <= 42 and -124.4 <= lon <= -114.1:
        return 'CA'  # California
    # Florida
    elif 24.5 <= lat <= 31 and -87.6 <= lon <= -80.0:
        return 'FL'  # Florida
    # Illinois
    elif 36.9 <= lat <= 42.5 and -91.5 <= lon <= -87.0:
        return 'IL'  # Illinois
    # Nueva York
    elif 40.5 <= lat <= 45 and -79.8 <= lon <= -71.8:
        return 'NY'  # Nueva York
    else:
        return 'Unknown'  # Si la latitud/longitud no coincide con ningún estado

# Función para subir archivos al bucket con manejo de tiempo de espera
def upload_to_bucket(blob, local_file_path):
    try:
        with open(local_file_path, "rb") as f:
            # Aumenta el tiempo de espera
            blob.upload_from_file(f, timeout=600)  # 600 segundos (10 minutos)
            print("Archivo subido exitosamente.")
    except Exception as e:
        print(f"Error subiendo el archivo: {str(e)}")

# Función para actualizar la carga inicial en todos los archivos del bucket 'data limpia'
def update_all_files_in_bucket(bucket_name='g1_datos_limpios'):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)

    # Listar todos los archivos en el bucket
    blobs = bucket.list_blobs()

    for blob in blobs:
        # Verificar si el archivo es un archivo Parquet
        if blob.name.endswith('.parquet'):
            print(f"Procesando el archivo: {blob.name}")

            # Leer el archivo Parquet
            existing_data = read_parquet_from_gcs(bucket_name, blob.name)

            if existing_data is not None:
                # Verificar y agregar columnas 'version' y 'fecha_ingreso' si no existen
                if 'version' not in existing_data.columns:
                    existing_data['version'] = 1  # Asignar la versión 1 para la carga inicial
                if 'fecha_ingreso' not in existing_data.columns:
                    existing_data['fecha_ingreso'] = datetime.now()  # Asignar la fecha y hora actual

                # Aplicar la transformación de la columna 'category' y 'assign_state' solo si es el archivo 'metadata_filtrada_final.parquet'
                if blob.name == "metadata_filtrada_final.parquet":
                    existing_data = transform_category_column(existing_data)
                    if 'latitude' in existing_data.columns and 'longitude' in existing_data.columns:
                        # Aplicar la función assign_state para agregar la columna 'state'
                        existing_data['state'] = existing_data.apply(assign_state, axis=1)
                        print("Columna 'state' agregada al archivo 'metadata_filtrada_final.parquet'.")

                # Guardar el archivo actualizado temporalmente en una ruta temporal válida
                with tempfile.NamedTemporaryFile(delete=False, suffix=".parquet") as temp_file:
                    local_file_path = temp_file.name
                    existing_data.to_parquet(local_file_path)
                    
                    # Subir el archivo actualizado de nuevo al bucket
                    updated_blob = bucket.blob(blob.name)
                    upload_to_bucket(updated_blob, local_file_path)
                
                # Eliminar el archivo temporal
                os.remove(local_file_path)
                print(f"Archivo {blob.name} actualizado y subido correctamente.")
            else:
                print(f"No se pudo procesar el archivo: {blob.name}")
    
    print("Todos los archivos han sido procesados y actualizados.")

# Función para leer archivos Parquet desde GCS
def read_parquet_from_gcs(bucket_name, blob_name):
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(blob_name)

    try:
        with blob.open("rb") as f:
            df = pd.read_parquet(f)
            return df
    except Exception as e:
        print(f"Error leyendo el archivo {blob_name}: {str(e)}")
        return None

# Ejecutar la función para actualizar todos los archivos
update_all_files_in_bucket()


Credenciales configuradas desde: data-avatar-435301-p6-45ed205c0371.json
Procesando el archivo: metadata_filtrada_final.parquet
Columna 'category' transformada a una cadena de texto separada por comas.
Columna 'state' agregada al archivo 'metadata_filtrada_final.parquet'.
Archivo subido exitosamente.
Archivo metadata_filtrada_final.parquet actualizado y subido correctamente.
Procesando el archivo: reviews_negocios_con_texto_filtrados.parquet
Archivo subido exitosamente.
Archivo reviews_negocios_con_texto_filtrados.parquet actualizado y subido correctamente.
Procesando el archivo: reviews_negocios_sin_texto_filtrados.parquet
Archivo subido exitosamente.
Archivo reviews_negocios_sin_texto_filtrados.parquet actualizado y subido correctamente.
Todos los archivos han sido procesados y actualizados.
