In [4]:
import pymongo
import logging
import os
import torch
import time
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack import Pipeline, Document
from haystack.document_stores.types import DuplicatePolicy
from haystack.components.writers import DocumentWriter
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder
from haystack_integrations.document_stores.mongodb_atlas import MongoDBAtlasDocumentStore
from haystack_integrations.components.retrievers.mongodb_atlas import MongoDBAtlasEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from pymongo import MongoClient, errors
from typing import List, Dict, Any
from getpass import getpass
from haystack.components.generators import OpenAIGenerator
from haystack.document_stores.types import DuplicatePolicy
import traceback

In [5]:
os.environ['MONGO_CONNECTION_STRING'] = ''
os.environ['OPENAI_API_KEY'] = ''

In [8]:
logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(message)s'
)
logger = logging.getLogger(__name__)
def extract_body_paragraphs(json_body: List[Dict]) -> List[str]:
    """
    Extract content from body paragraphs in the JSON body.
    
    :param json_body: List of JSON body objects
    :return: List of paragraph contents
    """
    if not isinstance(json_body, list):
        logger.warning(f"Expected list, got {type(json_body)}")
        return []
    
    return [
        item['content'] 
        for item in json_body 
        if isinstance(item, dict) and 
           item.get('__typename') == 'BodyParagraph' and 
           item.get('content')]

def vectorize_mongodb_documents(
    embedding_model: str = "intfloat/e5-base-v2",
    database_name: str = "contentDeliveryApi",
    collection_name: str = "Article",
    max_documents: int = None  # Process all documents by default
) -> Dict[str, Dict]:
    """
    Retrieve documents from MongoDB, extract body paragraphs, 
    and create vector embeddings stored in a dictionary.
    
    Returns:
        Dict[str, Dict]: Dictionary with document IDs as keys and values containing:
            - content: str (paragraph text)
            - embedding: List[float] (vector embedding)
            - meta: Dict (additional metadata)
    """
    start_time = time.time()
    #embeddings_dict = {}

    # Get MongoDB connection string from environment variable
    mongodb_uri = os.getenv('MONGO_CONNECTION_STRING')
    if not mongodb_uri:
        raise ValueError("MONGO_CONNECTION_STRING environment variable is not set")
    
    try:
        # Connect to MongoDB
        connect_start = time.time()
        client = MongoClient(mongodb_uri)
        
        # Test connection
        client.admin.command('ping')
        logger.info("Successfully connected to MongoDB")
        logger.info(f"Connection time: {time.time() - connect_start:.2f} seconds")
        
        db = client[database_name]
        collection = db[collection_name]
        
        # Log collection details
        logger.info(f"Database: {database_name}, Collection: {collection_name}")
        total_docs = collection.count_documents({"jsonBody": {"$exists": True, "$ne": []}})
        logger.info(f"Total documents to process: {total_docs}")
        
        if max_documents and max_documents < total_docs:
            total_docs = max_documents
        
        # Initialize embedder
        init_start = time.time()
        doc_embedder = SentenceTransformersDocumentEmbedder(model=embedding_model)
        doc_embedder.warm_up()
        logger.info(f"Initialization time: {time.time() - init_start:.2f} seconds")
        
        # Track processing
        documents = []
        processed_docs = 0
        error_docs = 0
        
        # Use cursor with optional limit
        cursor = collection.find(
            {"jsonBody": {"$exists": True, "$ne": []}},
            {"_id": 1, "jsonBody": 1}
        ).batch_size(1000)
        if max_documents:
            cursor = cursor.limit(max_documents)
        
        # Start processing
        process_start = time.time()
        for doc in cursor:
            try:
                # Extract body paragraphs
                paragraphs = extract_body_paragraphs(doc.get('jsonBody', []))
                
                # Create Document objects for each paragraph
                for idx, paragraph in enumerate(paragraphs):
                    document_id = f"{doc['_id']}_{idx}"
                    documents.append(Document(
                        content=paragraph,
                        meta={
                            'original_id': document_id,
                            'headline': doc.get('headline', '')
                        }
                    ))
                
                processed_docs += 1
                
                # Batch process to prevent memory issues
                if len(documents) >= 100:
                    batch_start = time.time()
                    logger.info(f"Batch processing {len(documents)} documents")
                    docs_with_embeddings = doc_embedder.run(documents)
                    
                    # Store embeddings in dictionary
                    for doc_with_embedding in docs_with_embeddings["documents"]:
                        embeddings_dict[doc_with_embedding.meta['original_id']] = {
                            'content': doc_with_embedding.content,
                            'embedding': doc_with_embedding.embedding,
                            'meta': doc_with_embedding.meta
                        }
                    
                    documents = []
                    logger.info(f"Batch processed in {time.time() - batch_start:.2f} seconds")
            
            except Exception as doc_error:
                error_docs += 1
                logger.error(f"Error processing document {doc.get('_id')}: {str(doc_error)}")
                logger.error(traceback.format_exc())
            
            # Estimate remaining time
            elapsed_time = time.time() - process_start
            docs_remaining = total_docs - processed_docs
            estimated_total_time = (elapsed_time / processed_docs) * total_docs if processed_docs > 0 else None
            time_remaining = estimated_total_time - elapsed_time if estimated_total_time else None
            
            # Log progress
            if processed_docs % 100 == 0:
                logger.info(f"Processed {processed_docs}/{total_docs} documents "
                          f"in {elapsed_time:.2f} seconds")
                if time_remaining is not None:
                    logger.info(f"Estimated time remaining: {time_remaining / 60:.2f} minutes")
        
        # Process any remaining documents
        if documents:
            logger.info(f"Processing final batch of {len(documents)} documents")
            final_batch_start = time.time()
            docs_with_embeddings = doc_embedder.run(documents)
            
            # Store final batch embeddings
            for doc_with_embedding in docs_with_embeddings["documents"]:
                embeddings_dict[doc_with_embedding.meta['original_id']] = {
                    'content': doc_with_embedding.content,
                    'embedding': doc_with_embedding.embedding,
                    'meta': doc_with_embedding.meta
                }
            
            logger.info(f"Final batch processed in {time.time() - final_batch_start:.2f} seconds")
        
        total_time = time.time() - start_time
        logger.info(f"Completed processing. Total time: {total_time/60:.2f} minutes")
        logger.info(f"Total docs processed: {processed_docs}, Errors: {error_docs}")
        logger.info(f"Total embeddings stored: {len(embeddings_dict)}")
        
        return embeddings_dict
    
    except errors.ConnectionFailure as e:
        logger.error(f"Failed to connect to MongoDB: {e}")
        raise
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        logger.error(traceback.format_exc())
        raise

In [1]:
embeddings_dict = {}
vectorize_mongodb_documents()

NameError: name 'vectorize_mongodb_documents' is not defined

In [14]:
import pickle


# Store data (serialize)
with open('filename.pickle', 'wb') as handle:
    pickle.dump(embeddings_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [5]:
import pickle
with open('filename.pickle', 'rb') as handle:
    unserialized_data = pickle.load(handle)

#unserialized_data

In [7]:
import pandas as pd

In [3]:
pd.DataFrame.from_dict(unserialized_data, orient='index',
                       columns=['content', 'embedding', 'meta'])

IndentationError: unexpected indent (1373527156.py, line 2)