In [2]:
import os
import weaviate
from weaviate.classes.init import Auth

# Best practice: store your credentials in environment variables
weaviate_url = os.environ["WEAVIATE_URL"]
weaviate_api_key = os.environ["WEAVIATE_API_KEY"]

# Connect to Weaviate Cloud
client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

print(client.is_ready())

UnexpectedStatusCodeError: Meta endpoint! Unexpected status code: 404, with response body: None.

In [4]:
weaviate_url = os.environ.get("WEAVIATE_URL")
weaviate_api_key = os.environ.get("WEAVIATE_API_KEY")
import weaviate
from weaviate import Client
from weaviate.classes.init import Auth
from weaviate.classes.query import Filter

from langchain_weaviate.vectorstores import WeaviateVectorStore

In [5]:
from langchain_core.embeddings import Embeddings
from langchain_huggingface import HuggingFaceEmbeddings
import os
from dotenv import load_dotenv
load_dotenv()
os.environ['HF_TOKEN'] = os.getenv('HF_TOKEN')

def get_embeddings_model() -> Embeddings:
    return HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") 

  from .autonotebook import tqdm as notebook_tqdm


In [10]:
store = WeaviateVectorStore(
            client=client,
            index_name="pdf_index",
            text_key="text",
            embedding=get_embeddings_model(),
            attributes=["source", "title"],
        )

In [11]:
ret = store.as_retriever()


In [12]:
res = ret.invoke("what is transformer")

In [13]:
res

[Document(metadata={'keywords': '', 'creator': 'LaTeX with hyperref', 'file_path': 'C:\\Users\\amins\\AppData\\Local\\Temp\\ingest_74749f49-8a8b-4c2a-bb40-c934519a72f8_1y378oaf\\Attention.pdf', 'trapped': '/False', 'page_label': '3', 'creationdate': datetime.datetime(2024, 4, 10, 21, 11, 43, tzinfo=datetime.timezone.utc), 'file_name': 'Attention.pdf', 'page': 2.0, 'ptex_fullbanner': 'This is pdfTeX, Version 3.141592653-2.6-1.40.25 (TeX Live 2023) kpathsea version 6.3.5', 'subject': '', 'directory': 'C:\\Users\\amins\\AppData\\Local\\Temp\\ingest_74749f49-8a8b-4c2a-bb40-c934519a72f8_1y378oaf', 'source': 'C:/Users/amins/AppData/Local/Temp/ingest_74749f49-8a8b-4c2a-bb40-c934519a72f8_1y378oaf/Attention.pdf', 'total_pages': 15.0, 'producer': 'pdfTeX-1.40.25', 'moddate': datetime.datetime(2024, 4, 10, 21, 11, 43, tzinfo=datetime.timezone.utc), 'title': '', 'author': ''}, page_content='Figure 1: The Transformer - model architecture.\nThe Transformer follows this overall architecture using sta

In [1]:
import os
import uuid
from datetime import datetime, timezone
from typing import cast, Optional, Any
from contextlib import contextmanager
from dataclasses import field

from langchain_core.documents import Document
from langchain_core.messages import BaseMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore
from pydantic import BaseModel

from backend import retrieval
from backend.configuration import Configuration, IndexConfiguration
from backend.state import InputState, State
from backend.utils import format_docs, get_message_text, load_chat_model

# Your ProdDBConfig class with user-specific configurations
class ProdDBConfig:
    @staticmethod
    def _build_uri() -> str:
        """Production URI with pooling, SSL, timeouts."""
        return (
            "postgresql://postgres:123456@localhost:5432/langgraphrag?"
            "sslmode=disable&"
            "connect_timeout=10"
        )
    
    @staticmethod
    def checkpointer() -> PostgresSaver:
        """Get checkpointer instance."""
        uri = ProdDBConfig._build_uri()
        return PostgresSaver.from_conn_string(uri)
    
    @staticmethod
    def store() -> PostgresStore:
        """Get store instance."""
        uri = ProdDBConfig._build_uri()
        return PostgresStore.from_conn_string(uri)

    @staticmethod
    @contextmanager
    def get_store_context():
        """Context manager for store operations."""
        store = ProdDBConfig.store()
        try:
            yield store
        finally:
            if hasattr(store, 'close'):
                store.close()

# Health check helper
def db_health_check():
    """Verify DB connectivity."""
    try:
        with ProdDBConfig.get_store_context() as store:
            print("Store connection successful")
        print("Database health check passed")
        return True
    except Exception as e:
        print(f"Database health check failed: {e}")
        return False

# Define the function that calls the model
class SearchQuery(BaseModel):
    """Search the indexed documents for a query."""
    query: str


  from .autonotebook import tqdm as notebook_tqdm


In [None]:

async def generate_query(
    state: State, *, config: RunnableConfig
) -> dict[str, list[str]]:
    """Generate a search query based on the current state and configuration."""
    messages = state.messages
    
    # Get configuration with user_id
    configuration = Configuration.from_runnable_config(config)
    
    if len(messages) == 1:
        human_input = get_message_text(messages[-1])
        return {"queries": [human_input]}
    else:
        prompt = ChatPromptTemplate.from_messages(
            [
                ("system", configuration.query_system_prompt),
                ("placeholder", "{messages}"),
            ]
        )
        model = load_chat_model(configuration.query_model).with_structured_output(
            SearchQuery
        )

        message_value = await prompt.ainvoke(
            {
                "messages": state.messages,
                "queries": "\n- ".join(state.queries),
                "system_time": datetime.now(tz=timezone.utc).isoformat(),
                "user_id": configuration.user_id,  # Include user_id in context
            },
            config,
        )
        generated = cast(SearchQuery, await model.ainvoke(message_value, config))
        return {
            "queries": [generated.query],
        }

async def retrieve(
    state: State, *, config: RunnableConfig
) -> dict[str, list[Document]]:
    """Retrieve documents based on the latest query in the state."""
    # Get configuration including IndexConfiguration
    configuration = Configuration.from_runnable_config(config)
    user_id = configuration.user_id
    
    # Store user query in database for history
    with ProdDBConfig.get_store_context() as store:
        query_data = {
            "query": state.queries[-1],
            "user_id": user_id,
            "timestamp": datetime.now(tz=timezone.utc).isoformat(),
            "metadata": {
                "retriever_provider": configuration.retriever_provider,
                "embedding_model": configuration.embedding_model
            }
        }
        
        # Store query with user-specific key
        query_id = f"query_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
        store.put(query_id, query_data)
    
    # Use the retriever with user-specific filtering
    with retrieval.make_retriever(config) as retriever:
        # If your retriever supports filtering by metadata, apply user filter
        if hasattr(config, 'metadata_filter'):
            config.metadata_filter = {"user_id": user_id}
        
        # Also pass user_id in search kwargs if supported
        search_kwargs = configuration.search_kwargs.copy()
        search_kwargs["metadata_filter"] = {"user_id": user_id}
        
        response = await retriever.ainvoke(
            state.queries[-1], 
            config={**config, "search_kwargs": search_kwargs}
        )
        return {"retrieved_docs": response}

async def respond(
    state: State, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
    """Call the LLM powering our "agent"."""
    configuration = Configuration.from_runnable_config(config)
    user_id = configuration.user_id
    
    # Store conversation in the database with user filtering
    with ProdDBConfig.get_store_context() as store:
        # Store conversation metadata with user_id
        conversation_data = {
            "user_id": user_id,
            "messages": [msg.dict() for msg in state.messages],
            "queries": state.queries,
            "retrieved_docs": [doc.dict() for doc in state.retrieved_docs],
            "timestamp": datetime.now(tz=timezone.utc).isoformat(),
            "configuration": {
                "retriever_provider": configuration.retriever_provider,
                "embedding_model": configuration.embedding_model
            }
        }
        
        # Store with user-specific key
        conversation_id = f"conversation_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        store.put(conversation_id, conversation_data)
    
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", configuration.response_system_prompt),
            ("placeholder", "{messages}"),
        ]
    )
    model = load_chat_model(configuration.response_model)

    retrieved_docs = format_docs(state.retrieved_docs)
    message_value = await prompt.ainvoke(
        {
            "messages": state.messages,
            "retrieved_docs": retrieved_docs,
            "system_time": datetime.now(tz=timezone.utc).isoformat(),
            "user_id": user_id,  # Include user_id in prompt context
        },
        config,
    )
    response = await model.ainvoke(message_value, config)
    
    # Store the response separately
    with ProdDBConfig.get_store_context() as store:
        response_data = {
            "user_id": user_id,
            "query": state.queries[-1] if state.queries else "",
            "response": response.content if hasattr(response, 'content') else str(response),
            "timestamp": datetime.now(tz=timezone.utc).isoformat(),
            "conversation_id": conversation_id
        }
        response_key = f"response_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
        store.put(response_key, response_data)
    
    return {"messages": [response]}

# Define a new graph
builder = StateGraph(State, input_schema=InputState, context_schema=Configuration)

builder.add_node(generate_query)
builder.add_node(retrieve)
builder.add_node(respond)
builder.add_edge("__start__", "generate_query")
builder.add_edge("generate_query", "retrieve")
builder.add_edge("retrieve", "respond")

# Finally, we compile it with checkpointing!
checkpointer = ProdDBConfig.checkpointer()

graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=[],
    interrupt_after=[],
)
graph.name = "RetrievalGraph"



TypeError: StateGraph.compile() got an unexpected keyword argument 'config_schema'

In [None]:

# Enhanced functions for user-specific operations
def load_user_conversation(thread_id: str, user_id: str, config: Optional[dict] = None):
    """Load a conversation from checkpoint by thread ID for specific user."""
    configurable = {
        "thread_id": thread_id,
        "user_id": user_id
    }
    if config:
        configurable.update(config.get("configurable", {}))
    
    return graph.get_state({"configurable": configurable})

def list_user_conversations(user_id: str, limit: int = 100):
    """List all stored conversation thread IDs for a specific user."""
    with ProdDBConfig.checkpointer() as cp:
        # This will need adjustment based on your PostgresSaver implementation
        # You might need to implement custom filtering
        all_conversations = cp.list({"configurable": {}}, limit=limit)
        
        # Filter by user_id if stored in checkpoint metadata
        user_conversations = []
        for conv in all_conversations:
            if hasattr(conv, 'metadata') and conv.metadata.get('user_id') == user_id:
                user_conversations.append(conv)
        
        return user_conversations

def get_user_conversation_history(user_id: str, limit: int = 50):
    """Get conversation history from store for a specific user."""
    with ProdDBConfig.get_store_context() as store:
        # This assumes your store supports scanning or querying by prefix
        # Adjust based on your PostgresStore implementation
        conversations = []
        
        # Try to get conversations by pattern
        # Note: Actual implementation depends on your store's query capabilities
        try:
            # If store has scan or query capabilities
            if hasattr(store, 'scan'):
                for key, value in store.scan():
                    if key.startswith(f"conversation_{user_id}_"):
                        conversations.append({
                            "id": key,
                            "data": value,
                            "timestamp": value.get("timestamp", "")
                        })
            
            # Sort by timestamp
            conversations.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
            
            return conversations[:limit]
        except:
            # Fallback - store implementation might differ
            return []

In [None]:

# Example usage:
if __name__ == "__main__":
    # Health check
    if db_health_check():
        print("Database is ready")
        
        # Example: Invoke the graph with user-specific configuration
        user_id = "user_12345"  # Or use UUID
        config = {
            "configurable": {
                "thread_id": f"{user_id}_session_1",
                "user_id": user_id,
                "retriever_provider": "weaviate",
                "embedding_model": "all-MiniLM-L6-v2"
            }
        }
        
        # This will automatically filter and store data for this user
        result = graph.invoke(
            {"messages": [{"role": "user", "content": "Hello, how are you?"}]},
            config=config
        )
        
        print("Response:", result["messages"][-1].content)
        
        # Get user's conversation history
        history = get_user_conversation_history(user_id, limit=10)
        print(f"\nUser {user_id} has {len(history)} conversations")
        
        # Continue conversation with same user
        result2 = graph.invoke(
            {"messages": [{"role": "user", "content": "Tell me more about that"}]},
            config=config  # Same thread_id and user_id will load previous state
        )
        
        print("\nFollow-up response:", result2["messages"][-1].content)

In [None]:
"""Main entrypoint for the conversational retrieval graph with user memory.

This module defines the core structure and functionality of the conversational
retrieval graph with stateful user preferences via langgraph.json store.
Supports cross-thread memory (user prefs) + thread-scoped history.
"""

from datetime import datetime, timezone
from typing import Any, Annotated, TypedDict, cast, Sequence
from operator import add

from langchain_core.documents import Document
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langchain_core.pydantic_v1 import BaseModel, Field
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from pydantic import BaseModel as PydanticBaseModel

from backend import retrieval
from backend.configuration import Configuration
from backend.state import InputState, State  # Assume State updated below
from backend.utils import format_docs, get_message_text, load_chat_model

# User preference schema for tools
class UserInfo(PydanticBaseModel):
    """User preferences schema."""
    name: str = Field(..., description="User's full name")
    favorite_topics: list[str] = Field(default_factory=list, description="Favorite topics")
    preferred_format: str = Field(default="detailed", description="Response style: brief/detailed")

# Memory tools - access store via config["store"] (langgraph.json injection)
@tool
async def get_user_prefs(runtime: Any) -> str:
    """Get stored preferences for current user."""
    store = runtime.config.get("store")
    if not store:
        return "No store available."
    user_id = runtime.config["configurable"]["user_id"]
    namespace = ("users", user_id)
    item = await store.aget(namespace, "profile")
    if item and item.value:
        prefs = item.value
        return f"Name: {prefs.get('name', 'Unknown')}\nTopics: {prefs.get('favorite_topics', [])}\nFormat: {prefs.get('preferred_format', 'detailed')}"
    return "No preferences stored yet."

@tool
async def save_user_prefs(info: UserInfo, runtime: Any) -> str:
    """Save/update user preferences."""
    store = runtime.config.get("store")
    if not store:
        return "No store available."
    user_id = runtime.config["configurable"]["user_id"]
    namespace = ("users", user_id)
    await store.aput(namespace, "profile", info.model_dump())
    return "Preferences saved!"

class SearchQuery(PydanticBaseModel):
    """Search the indexed documents for a query."""
    query: str

async def generate_query(
    state: State, *, config: RunnableConfig
) -> dict[str, list[str]]:
    """Generate a search query based on the current state and configuration."""
    messages = state.messages
    if len(messages) == 1:
        human_input = get_message_text(messages[-1])
        return {"queries": [human_input]}
    else:
        configuration = Configuration.from_runnable_config(config)
        prompt = ChatPromptTemplate.from_messages([
            ("system", configuration.query_system_prompt),
            ("placeholder", "{messages}"),
        ])
        model = load_chat_model(configuration.query_model).with_structured_output(SearchQuery)

        message_value = await prompt.ainvoke({
            "messages": state.messages,
            "queries": "\n- ".join(state.queries),
            "system_time": datetime.now(tz=timezone.utc).isoformat(),
        }, config)
        generated = cast(SearchQuery, await model.ainvoke(message_value, config))
        return {"queries": [generated.query]}

async def retrieve(
    state: State, *, config: RunnableConfig
) -> dict[str, list[Document]]:
    """Retrieve documents based on the latest query in the state."""
    with retrieval.make_retriever(config) as retriever:
        response = await retriever.ainvoke(state.queries[-1], config)
        return {"retrieved_docs": response}

async def respond(
    state: State, *, config: RunnableConfig
) -> dict[str, Sequence[BaseMessage]]:
    """Enhanced respond with user memory injection + tools."""
    store = config.get("store")  # langgraph.json injects AsyncPostgresStore
    configuration = Configuration.from_runnable_config(config)
    
    # Load user prefs from store
    user_id = config["configurable"]["user_id"]
    namespace = ("users", user_id)
    profile_item = await store.aget(namespace, "profile") if store else None
    
    user_context = ""
    if profile_item and profile_item.value:
        prefs = profile_item.value
        user_context = (
            f"User preferences: Name={prefs.get('name', 'Unknown')}, "
            f"Topics={prefs.get('favorite_topics', [])}, "
            f"Format={prefs.get('preferred_format', 'detailed')}.\n"
            f"Adapt responses accordingly."
        )
    
    # Enhanced prompt with user context
    prompt = ChatPromptTemplate.from_messages([
        ("system", f"{configuration.response_system_prompt}\n\n{user_context}"),
        ("placeholder", "{messages}"),
    ])
    
    model = load_chat_model(configuration.response_model)
    # Bind memory tools
    model_with_tools = model.bind_tools([get_user_prefs, save_user_prefs])
    
    retrieved_docs = format_docs(state.retrieved_docs)
    message_value = await prompt.ainvoke({
        "messages": state.messages,
        "retrieved_docs": retrieved_docs,
        "system_time": datetime.now(tz=timezone.utc).isoformat(),
    }, config)
    
    # Invoke with store access for tools
    response = await model_with_tools.ainvoke(message_value, config)
    return {"messages": [response]}

# Updated State (add annotations if needed)
class RetrievalState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]
    queries: Annotated[list[str], add]
    retrieved_docs: list[Document]

# Build graph
builder = StateGraph(RetrievalState, input_schema=InputState)  # Use your InputState
builder.add_node("generate_query", generate_query)
builder.add_node("retrieve", retrieve)
builder.add_node("respond", respond)

builder.add_edge("__start__", "generate_query")
builder.add_edge("generate_query", "retrieve")
builder.add_edge("retrieve", "respond")
builder.add_edge("respond", END)

# NO manual store/checkpointer - langgraph.json handles it!
graph = builder.compile(
    interrupt_before=[],
    interrupt_after=[],
)
graph.name = "RetrievalGraph"

# Example usage function
async def chat_example():
    config = {"configurable": {"thread_id": "chat1", "user_id": "alice123"}}
    
    # Session 1: Learns prefs + retrieves
    result1 = await graph.ainvoke({
        "messages": [HumanMessage(content="Hi I'm Alice, love hiking/tech. What's RAG?")]
    }, config)
    print("Response 1:", result1["messages"][-1].content)
    
    # New thread: Uses prefs in retrieval context!
    config2 = {"configurable": {"thread_id": "chat2", "user_id": "alice123"}}
    result2 = await graph.ainvoke({
        "messages": [HumanMessage(content="More on RAG, make it brief")]
    }, config2)
    print("Response 2:", result2["messages"][-1].content)

In [3]:
# config.py
from pydantic import BaseModel, HttpUrl
from typing import Optional, List
from enum import Enum

class IndexingStrategy(str, Enum):
    SITEMAP = "sitemap"
    RECURSIVE = "recursive"
    CUSTOM = "custom"

class WebsiteConfig(BaseModel):
    url: HttpUrl
    name: str
    index_name: str
    strategy: IndexingStrategy = IndexingStrategy.SITEMAP
    filter_urls: Optional[List[str]] = None
    allowed_domains: Optional[List[str]] = None
    max_depth: Optional[int] = 2
    chunk_size: int = 4000
    chunk_overlap: int = 200

In [4]:
!pip install chromadb

Collecting chromadb
  Downloading chromadb-1.4.1-cp39-abi3-win_amd64.whl.metadata (7.3 kB)
Collecting build>=1.0.3 (from chromadb)
  Downloading build-1.4.0-py3-none-any.whl.metadata (5.8 kB)
Collecting pybase64>=1.4.1 (from chromadb)
  Downloading pybase64-1.4.3-cp313-cp313-win_amd64.whl.metadata (9.1 kB)
Collecting posthog<6.0.0,>=2.4.0 (from chromadb)
  Downloading posthog-5.4.0-py3-none-any.whl.metadata (5.7 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.23.2-cp313-cp313-win_amd64.whl.metadata (5.3 kB)
Collecting opentelemetry-exporter-otlp-proto-grpc>=1.2.0 (from chromadb)
  Downloading opentelemetry_exporter_otlp_proto_grpc-1.39.1-py3-none-any.whl.metadata (2.5 kB)
Collecting pypika>=0.48.9 (from chromadb)
  Downloading pypika-0.50.0-py2.py3-none-any.whl.metadata (51 kB)
Collecting overrides>=7.3.1 (from chromadb)
  Downloading overrides-7.7.0-py3-none-any.whl.metadata (5.8 kB)
Collecting importlib-resources (from chromadb)
  Downloading importlib_

  res = process_handler(cmd, _system_body)
  res = process_handler(cmd, _system_body)
  res = process_handler(cmd, _system_body)


In [5]:
import re
from typing import Generator

from bs4 import BeautifulSoup, Doctype, NavigableString, Tag
def langchain_docs_extractor(soup: BeautifulSoup) -> str:
    # Remove all the tags that are not meaningful for the extraction.
    SCAPE_TAGS = ["nav", "footer", "aside", "script", "style"]
    [tag.decompose() for tag in soup.find_all(SCAPE_TAGS)]

    def get_text(tag: Tag) -> Generator[str, None, None]:
        for child in tag.children:
            if isinstance(child, Doctype):
                continue

            if isinstance(child, NavigableString):
                yield child
            elif isinstance(child, Tag):
                if child.name in ["h1", "h2", "h3", "h4", "h5", "h6"]:
                    yield f"{'#' * int(child.name[1:])} {child.get_text()}\n\n"
                elif child.name == "a":
                    yield f"[{child.get_text(strip=False)}]({child.get('href')})"
                elif child.name == "img":
                    yield f"![{child.get('alt', '')}]({child.get('src')})"
                elif child.name in ["strong", "b"]:
                    yield f"**{child.get_text(strip=False)}**"
                elif child.name in ["em", "i"]:
                    yield f"_{child.get_text(strip=False)}_"
                elif child.name == "br":
                    yield "\n"
                elif child.name == "code":
                    parent = child.find_parent()
                    if parent is not None and parent.name == "pre":
                        classes = parent.attrs.get("class", "")

                        language = next(
                            filter(lambda x: re.match(r"language-\w+", x), classes),
                            None,
                        )
                        if language is None:
                            language = ""
                        else:
                            language = language.split("-")[1]

                        lines: list[str] = []
                        for span in child.find_all("span", class_="token-line"):
                            line_content = "".join(
                                token.get_text() for token in span.find_all("span")
                            )
                            lines.append(line_content)

                        code_content = "\n".join(lines)
                        yield f"```{language}\n{code_content}\n```\n\n"
                    else:
                        yield f"`{child.get_text(strip=False)}`"

                elif child.name == "p":
                    yield from get_text(child)
                    yield "\n\n"
                elif child.name == "ul":
                    for li in child.find_all("li", recursive=False):
                        yield "- "
                        yield from get_text(li)
                        yield "\n\n"
                elif child.name == "ol":
                    for i, li in enumerate(child.find_all("li", recursive=False)):
                        yield f"{i + 1}. "
                        yield from get_text(li)
                        yield "\n\n"
                elif child.name == "div" and "tabs-container" in child.attrs.get(
                    "class", [""]
                ):
                    tabs = child.find_all("li", {"role": "tab"})
                    tab_panels = child.find_all("div", {"role": "tabpanel"})
                    for tab, tab_panel in zip(tabs, tab_panels):
                        tab_name = tab.get_text(strip=True)
                        yield f"{tab_name}\n"
                        yield from get_text(tab_panel)
                elif child.name == "table":
                    thead = child.find("thead")
                    header_exists = isinstance(thead, Tag)
                    if header_exists:
                        headers = thead.find_all("th")
                        if headers:
                            yield "| "
                            yield " | ".join(header.get_text() for header in headers)
                            yield " |\n"
                            yield "| "
                            yield " | ".join("----" for _ in headers)
                            yield " |\n"

                    tbody = child.find("tbody")
                    tbody_exists = isinstance(tbody, Tag)
                    if tbody_exists:
                        for row in tbody.find_all("tr"):
                            yield "| "
                            yield " | ".join(
                                cell.get_text(strip=True) for cell in row.find_all("td")
                            )
                            yield " |\n"

                    yield "\n\n"
                elif child.name in ["button"]:
                    continue
                else:
                    yield from get_text(child)

    joined = "".join(get_text(soup))
    return re.sub(r"\n\n+", "\n\n", joined).strip()

In [6]:
# ingestion_service.py
import logging
import os
from typing import Optional, Dict, Any
from urllib.parse import urlparse
from dotenv import load_dotenv
load_dotenv()

import chromadb
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import SitemapLoader, RecursiveUrlLoader
from langchain_community.indexes._sql_record_manager import SQLRecordManager
from langchain_core.indexing import index
from langchain_text_splitters import RecursiveCharacterTextSplitter
from bs4 import BeautifulSoup, SoupStrainer

# from config import WebsiteConfig, IndexingStrategy
# from parser import langchain_docs_extractor
from embeddings import get_embeddings_model


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class IngestionService:
    def __init__(self):
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=4000, 
            chunk_overlap=200
        )
        self.embedding = get_embeddings_model()
        self.client = chromadb.PersistentClient(path="./chroma_db")
        
    @staticmethod
    def metadata_extractor(meta: dict, soup: BeautifulSoup) -> dict:
        """Extract metadata from HTML soup"""
        title = soup.find("title")
        description = soup.find("meta", attrs={"name": "description"})
        html = soup.find("html")
        return {
            "source": meta.get("loc", ""),
            "title": title.get_text() if title else "",
            "description": description.get("content", "") if description else "",
            "language": html.get("lang", "") if html else "",
            **meta,
        }
    
    def _extract_domain_from_url(self, url: str) -> str:
        """Extract domain from URL for collection naming"""
        parsed = urlparse(url)
        domain = parsed.netloc.replace("www.", "").replace(".", "_")
        return domain
    
    def _create_index_name(self, config: WebsiteConfig) -> str:
        """Create a unique index name based on config"""
        if config.index_name:
            return config.index_name
        
        domain = self._extract_domain_from_url(str(config.url))
        return f"{domain}_{config.strategy}"
    
    def load_documents(self, config: WebsiteConfig):
        """Load documents based on indexing strategy"""
        if config.strategy == IndexingStrategy.SITEMAP:
            # Try sitemap first
            sitemap_urls = [
                f"{config.url}/sitemap.xml",
                f"{config.url}/sitemap_index.xml",
                f"{config.url}/sitemap",
            ]
            
            for sitemap_url in sitemap_urls:
                try:
                    loader = SitemapLoader(
                        sitemap_url,
                        filter_urls=config.filter_urls or [str(config.url)],
                        parsing_function=langchain_docs_extractor,
                        default_parser="lxml",
                        bs_kwargs={
                            "parse_only": SoupStrainer(
                                name=("article", "title", "html", "lang", "content")
                            ),
                        },
                        meta_function=self.metadata_extractor,
                    )
                    return loader.load()
                except Exception as e:
                    logger.warning(f"Failed to load sitemap from {sitemap_url}: {e}")
                    continue
            
            # Fall back to recursive if sitemap not found
            config.strategy = IndexingStrategy.RECURSIVE
        
        if config.strategy == IndexingStrategy.RECURSIVE:
            loader = RecursiveUrlLoader(
                url=str(config.url),
                max_depth=config.max_depth or 2,
                extractor=langchain_docs_extractor,
                prevent_outside=config.allowed_domains is None,
                allowed_domains=config.allowed_domains,
                use_async=True,
            )
            return loader.load()
        
        raise ValueError(f"Unsupported strategy: {config.strategy}")
    
    def ingest_website(self, config: WebsiteConfig, force_update: bool = False) -> Dict[str, Any]:
        """Main ingestion method"""
        try:
            # Determine collection name
            collection_name = self._create_index_name(config)
            
            # Initialize vector store
            vectorstore = Chroma(
                client=self.client,
                collection_name=collection_name,
                embedding_function=self.embedding,
            )
            
            # Initialize record manager
            record_manager = SQLRecordManager(
                f"chroma/{collection_name}",
                db_url=os.getenv("RECORD_MANAGER_DB_URL")
            )
            record_manager.create_schema()
            
            # Load documents
            logger.info(f"Loading documents from {config.url} using {config.strategy} strategy")
            docs = self.load_documents(config)
            logger.info(f"Loaded {len(docs)} documents")
            
            # Split documents
            docs_transformed = self.text_splitter.split_documents(docs)
            docs_transformed = [
                doc for doc in docs_transformed 
                if len(doc.page_content) > 10
            ]
            
            # Ensure required metadata
            for doc in docs_transformed:
                if "source" not in doc.metadata:
                    doc.metadata["source"] = str(config.url)
                if "title" not in doc.metadata:
                    doc.metadata["title"] = config.name
            
            # Index documents
            indexing_stats = index(
                docs_transformed,
                record_manager,
                vectorstore,
                cleanup="full",
                source_id_key="source",
                force_update=force_update,
            )
            
            # Get collection statistics
            collection = self.client.get_collection(collection_name)
            num_vectors = collection.count()
            
            return {
                "success": True,
                "collection_name": collection_name,
                "indexing_stats": indexing_stats,
                "num_documents": len(docs),
                "num_chunks": len(docs_transformed),
                "num_vectors": num_vectors,
                "strategy_used": config.strategy,
            }
            
        except Exception as e:
            logger.error(f"Error ingesting website {config.url}: {e}")
            return {
                "success": False,
                "error": str(e),
                "collection_name": getattr(config, 'index_name', 'unknown'),
            }

# Singleton instance
ingestion_service = IngestionService()

  from .autonotebook import tqdm as notebook_tqdm
USER_AGENT environment variable not set, consider setting it to identify your requests.
  Base = declarative_base()
INFO:sentence_transformers.SentenceTransformer:Use pytorch device_name: cpu
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: all-MiniLM-L6-v2
INFO:chromadb.telemetry.product.posthog:Anonymized telemetry enabled. See                     https://docs.trychroma.com/telemetry for more information.


In [None]:
# api.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
import uuid
from datetime import datetime

from config import WebsiteConfig, IndexingStrategy
from ingestion_service import ingestion_service

app = FastAPI(title="Document Ingestion API")

# Store job status
ingestion_jobs: Dict[str, Dict[str, Any]] = {}

class IngestionRequest(BaseModel):
    url: str
    name: Optional[str] = None
    index_name: Optional[str] = None
    strategy: Optional[str] = "sitemap"
    filter_urls: Optional[List[str]] = None
    allowed_domains: Optional[List[str]] = None
    max_depth: Optional[int] = 2
    chunk_size: Optional[int] = 4000
    chunk_overlap: Optional[int] = 200
    force_update: Optional[bool] = False

class IngestionResponse(BaseModel):
    job_id: str
    status: str
    message: str
    collection_name: Optional[str] = None

class JobStatusResponse(BaseModel):
    job_id: str
    status: str
    result: Optional[Dict[str, Any]] = None
    error: Optional[str] = None
    created_at: datetime
    updated_at: datetime

def process_ingestion_job(job_id: str, config: WebsiteConfig, force_update: bool):
    """Background task to process ingestion"""
    try:
        ingestion_jobs[job_id]["status"] = "processing"
        ingestion_jobs[job_id]["updated_at"] = datetime.now()
        
        result = ingestion_service.ingest_website(config, force_update)
        
        ingestion_jobs[job_id].update({
            "status": "completed",
            "result": result,
            "updated_at": datetime.now()
        })
    except Exception as e:
        ingestion_jobs[job_id].update({
            "status": "failed",
            "error": str(e),
            "updated_at": datetime.now()
        })

@app.post("/ingest", response_model=IngestionResponse)
async def ingest_website(
    request: IngestionRequest,
    background_tasks: BackgroundTasks
):
    """Start website ingestion"""
    try:
        # Generate job ID
        job_id = str(uuid.uuid4())
        
        # Create website name if not provided
        name = request.name or f"Website_{job_id[:8]}"
        
        # Create config
        config = WebsiteConfig(
            url=request.url,
            name=name,
            index_name=request.index_name,
            strategy=IndexingStrategy(request.strategy),
            filter_urls=request.filter_urls,
            allowed_domains=request.allowed_domains,
            max_depth=request.max_depth,
            chunk_size=request.chunk_size,
            chunk_overlap=request.chunk_overlap,
        )
        
        # Store job
        ingestion_jobs[job_id] = {
            "status": "pending",
            "config": config.dict(),
            "created_at": datetime.now(),
            "updated_at": datetime.now(),
        }
        
        # Start background task
        background_tasks.add_task(
            process_ingestion_job,
            job_id,
            config,
            request.force_update
        )
        
        return IngestionResponse(
            job_id=job_id,
            status="pending",
            message="Ingestion job started",
            collection_name=config.index_name
        )
        
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/jobs/{job_id}", response_model=JobStatusResponse)
async def get_job_status(job_id: str):
    """Check ingestion job status"""
    if job_id not in ingestion_jobs:
        raise HTTPException(status_code=404, detail="Job not found")
    
    job = ingestion_jobs[job_id]
    return JobStatusResponse(
        job_id=job_id,
        status=job["status"],
        result=job.get("result"),
        error=job.get("error"),
        created_at=job["created_at"],
        updated_at=job["updated_at"]
    )

@app.get("/jobs")
async def list_jobs():
    """List all ingestion jobs"""
    return {
        "jobs": [
            {
                "job_id": job_id,
                "status": job["status"],
                "url": job["config"]["url"],
                "created_at": job["created_at"],
                "updated_at": job["updated_at"]
            }
            for job_id, job in ingestion_jobs.items()
        ]
    }

@app.get("/collections")
async def list_collections():
    """List all collections in ChromaDB"""
    collections = ingestion_service.client.list_collections()
    return {
        "collections": [
            {
                "name": collection.name,
                "metadata": collection.metadata,
                "count": collection.count()
            }
            for collection in collections
        ]
    }

@app.delete("/collections/{collection_name}")
async def delete_collection(collection_name: str):
    """Delete a collection"""
    try:
        ingestion_service.client.delete_collection(collection_name)
        return {"success": True, "message": f"Collection {collection_name} deleted"}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)