In [1]:
import os
import firebase_admin
from firebase_admin import credentials, firestore
import vertexai
from vertexai.language_models import TextEmbeddingModel
from google.cloud import storage
import json
import time
from dotenv import load_dotenv


In [2]:
# uploading the environment variables and get the API key
load_dotenv()
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY")

# Asegurar que Vertex AI use las credenciales correctas
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "../../../bubbo-dfba0-47e395cdcdc7.json"

BUCKET_NAME = 'embeddings_bucket_backup'
storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

In [None]:
# Inicialización
cred = credentials.Certificate('../../../bubbo-dfba0-47e395cdcdc7.json')
firebase_admin.initialize_app(cred)
db = firestore.client()

PROJECT_ID = "bubbo-dfba0"
REGION = "us-central1"
MODEL_ID = "text-multilingual-embedding-002"
vertexai.init(project=PROJECT_ID, location=REGION)
model = TextEmbeddingModel.from_pretrained(MODEL_ID)

In [None]:
def get_text_embedding(texts):
    """Genera embeddings para una lista de textos."""
    try:
        embeddings = model.get_embeddings(texts)
        return [embedding.values for embedding in embeddings]
    except Exception as e:
        print(f"Error al generar embeddings: {e}")
        return None

def process_and_store_embeddings():
    print("Iniciando procesamiento de documentos...")
    input_collection_ref = db.collection('Data_EN')
    docs = input_collection_ref.stream()
    total_docs = 0
    updated_docs = 0
    failed_embeddings = []

    # Cargar IDs existentes del bucket
    existing_ids = set()
    for blob in bucket.list_blobs():
        if blob.name.endswith('.json'):
            try:
                content = blob.download_as_text()
                existing_data = json.loads(content)
                if isinstance(existing_data, list):
                    for item in existing_data:
                        existing_ids.add(item.get('ID'))
                elif isinstance(existing_data, dict):
                    existing_ids.add(existing_data.get('ID'))
            except json.JSONDecodeError:
                print(f"Error al decodificar JSON del blob: {blob.name}")
            except Exception as e:
                print(f"Error al procesar blob {blob.name}: {e}")

    print(f"Se encontraron {len(existing_ids)} IDs existentes en el bucket.")

    batch_size = 25
    text_batch = []
    doc_batch = []

    for doc in docs:
        try:
            data = doc.to_dict()
            text = f"{data.get('CleanTitle', '')} {data.get('Genre', '')} {data.get('Synopsis', '')}".strip()

            if text:
                text_batch.append(text)
                doc_batch.append(doc)

            if len(text_batch) >= batch_size:
                embeddings = get_text_embedding(text_batch)
                if embeddings is None:
                    print(f"Error al obtener embeddings para el batch. Se omite.")
                    failed_embeddings.extend([doc.id for doc in doc_batch])
                    continue

                for i, embedding in enumerate(embeddings):
                    doc = doc_batch[i]
                    embedding_data = {
                        'ID': doc.id,
                        'embedding': embedding
                    }
                    json_data = json.dumps(embedding_data).encode('utf-8')
                    blob_name = f"{doc.id}.json"

                    if doc.id not in existing_ids:
                        blob = bucket.blob(blob_name)
                        blob.upload_from_string(json_data, content_type='application/json')
                        total_docs += 1
                        updated_docs += 1
                        print(f"Procesado {total_docs}: {doc.id} (Nuevo) -> {text_batch[i][:30]}...")

                    elif doc.id in existing_ids:
                        blob = bucket.blob(blob_name)
                        blob.upload_from_string(json_data, content_type='application/json')
                        total_docs += 1
                        print(f"Procesado {total_docs}: {doc.id} (Actualizado) -> {text_batch[i][:30]}...")

                    if total_docs % 1000 == 0:
                        print(f"Progreso: {total_docs} documentos procesados.")

                # Reiniciar el batch
                text_batch = []
                doc_batch = []
                time.sleep(2)  # Esperar 1 segundo

        except Exception as e:
            print(f"Error al procesar documento {doc.id}: {e}")
            failed_embeddings.append(doc.id)

    # Procesar el último batch (si no está vacío)
    if text_batch:
        embeddings = get_text_embedding(text_batch)
        if embeddings is None:
            print(f"Error al obtener embeddings para el último batch. Se omite.")
            failed_embeddings.extend([doc.id for doc in doc_batch])
        else:
            for i, embedding in enumerate(embeddings):
                doc = doc_batch[i]
                embedding_data = {
                    'ID': doc.id,
                    'embedding': embedding
                }
                json_data = json.dumps(embedding_data).encode('utf-8')
                blob_name = f"{doc.id}.json"

                if doc.id not in existing_ids:
                    blob = bucket.blob(blob_name)
                    blob.upload_from_string(json_data, content_type='application/json')
                    total_docs += 1
                    updated_docs += 1
                    print(f"Procesado {total_docs}: {doc.id} (Nuevo) -> {text_batch[i][:30]}...")

                elif doc.id in existing_ids:
                    blob = bucket.blob(blob_name)
                    blob.upload_from_string(json_data, content_type='application/json')
                    total_docs += 1
                    print(f"Procesado {total_docs}: {doc.id} (Actualizado) -> {text_batch[i][:30]}...")

                if total_docs % 1000 == 0:
                    print(f"Progreso: {total_docs} documentos procesados.")

    # Guardar los IDs de los embeddings fallidos
    if failed_embeddings:
        with open("failed_embeddings.json", "w") as f:
            json.dump(failed_embeddings, f, indent=4)
        print(f"Se encontraron {len(failed_embeddings)} embeddings fallidos. IDs guardados en failed_embeddings.json")

    print(f"Finalizado. Total de documentos procesados: {total_docs}. Documentos Nuevos o actualizados: {updated_docs}")

process_and_store_embeddings()

Iniciando procesamiento de documentos...
Se encontraron 3200 IDs existentes en el bucket.
Procesado 1: 10 (Actualizado) -> All in Good Faith Comedy This ...
Procesado 2: 100 (Actualizado) -> Lock, Stock and Two Smoking Ba...
Procesado 3: 1000000 (Actualizado) -> Women's toilets Documentary Th...
Procesado 4: 10000017 (Actualizado) -> Her Deadly Boyfriend Drama; Su...
Procesado 5: 10000020 (Actualizado) -> Godspeed, The Poles! Documenta...
Procesado 6: 1000004 (Actualizado) -> Purple Beatz Drama; Romance Sa...
Procesado 7: 10000042 (Actualizado) -> Flying Fortress: History of th...
Procesado 8: 10000064 (Actualizado) -> Bisping Sport After a tumultuo...
Procesado 9: 1000007 (Actualizado) -> Kyle Brownrigg: Introducing Ly...
Procesado 10: 10000071 (Actualizado) -> Life in Long Beach Documentary...
Procesado 11: 1000008 (Actualizado) -> The DeAnne Smith EXperience Co...
Procesado 12: 10000120 (Actualizado) -> Night of the Falling Stars Fan...
Procesado 13: 10000125 (Actualizado) -> The Co

AttributeError: '_UnaryStreamMultiCallable' object has no attribute '_retry'