In [4]:
from pathlib import Path

def list_folder_names(folder_path: str):
    """
    Returns a list of all file and directory names in the given folder.
    
    Parameters:
    folder_path (str): A raw string path to the folder (e.g., r"C:\path\to\folder").
    
    Returns:
    List[str]: A list of names (not full paths) of the folder's immediate contents.
    """
    path = Path(folder_path)
    if not path.exists():
        raise FileNotFoundError(f"Path '{folder_path}' does not exist.")
    return [item.name for item in path.iterdir()]


# Example usage:
folder_path = r"C:\Users\Ans\Desktop\code\16_agent_services_toolkit\agent-service-toolkit\src\memory"
names = list_folder_names(folder_path)

for name in names:
    print(name)


mongodb.py
postgres.py
sqlite.py
__init__.py


# Logging

<img src="./images/levels.png" alt="Levels diagram" width="400"/>


### basics: https://www.youtube.com/watch?v=g8nQ90Hk328
### medium: https://www.youtube.com/watch?v=A3FkYRN9qog
### advanced: https://www.youtube.com/watch?v=9L77QExPmI0

# core/settings.py

## message:2 -> branch:6

### https://chatgpt.com/share/68972751-0b64-8009-a9d0-596765902fad

In [1]:
# from enum import StrEnum, auto
# from typing import TypeAlias


# class Provider(StrEnum):
#     OPENAI            = auto()
#     OPENAI_COMPATIBLE = auto()
#     AZURE_OPENAI      = auto()
#     DEEPSEEK          = auto()
#     ANTHROPIC         = auto()
#     GOOGLE            = auto()
#     VERTEXAI          = auto()
#     GROQ              = auto()
#     AWS               = auto()
#     OLLAMA            = auto()
#     OPENROUTER        = auto()
#     FAKE              = auto()


# class OpenAIModelName(StrEnum):
#     """https://platform.openai.com/docs/models/gpt-4o"""

#     GPT_4O_MINI = "gpt-4o-mini"
#     GPT_4O = "gpt-4o"


# class AzureOpenAIModelName(StrEnum):
#     """Azure OpenAI model names"""

#     AZURE_GPT_4O = "azure-gpt-4o"
#     AZURE_GPT_4O_MINI = "azure-gpt-4o-mini"


# class DeepseekModelName(StrEnum):
#     """https://api-docs.deepseek.com/quick_start/pricing"""

#     DEEPSEEK_CHAT = "deepseek-chat"


# class AnthropicModelName(StrEnum):
#     """https://docs.anthropic.com/en/docs/about-claude/models#model-names"""

#     HAIKU_3 = "claude-3-haiku"
#     HAIKU_35 = "claude-3.5-haiku"
#     SONNET_35 = "claude-3.5-sonnet"


# class GoogleModelName(StrEnum):
#     """https://ai.google.dev/gemini-api/docs/models/gemini"""

#     GEMINI_15_PRO = "gemini-1.5-pro"
#     GEMINI_20_FLASH = "gemini-2.0-flash"
#     GEMINI_20_FLASH_LITE = "gemini-2.0-flash-lite"
#     GEMINI_25_FLASH = "gemini-2.5-flash"
#     GEMINI_25_PRO = "gemini-2.5-pro"


# class VertexAIModelName(StrEnum):
#     """https://cloud.google.com/vertex-ai/generative-ai/docs/models"""

#     GEMINI_15_PRO = "gemini-1.5-pro"
#     GEMINI_20_FLASH = "gemini-2.0-flash"
#     GEMINI_20_FLASH_LITE = "models/gemini-2.0-flash-lite"
#     GEMINI_25_FLASH = "models/gemini-2.5-flash"
#     GEMINI_25_PRO = "gemini-2.5-pro"


# class GroqModelName(StrEnum):
#     """https://console.groq.com/docs/models"""

#     LLAMA_31_8B = "llama-3.1-8b"
#     LLAMA_33_70B = "llama-3.3-70b"

#     LLAMA_GUARD_4_12B = "meta-llama/llama-guard-4-12b"


# class AWSModelName(StrEnum):
#     """https://docs.aws.amazon.com/bedrock/latest/userguide/models-supported.html"""

#     BEDROCK_HAIKU = "bedrock-3.5-haiku"
#     BEDROCK_SONNET = "bedrock-3.5-sonnet"


# class OllamaModelName(StrEnum):
#     """https://ollama.com/search"""

#     OLLAMA_GENERIC = "ollama"


# class OpenRouterModelName(StrEnum):
#     """https://openrouter.ai/models"""

#     GEMINI_25_FLASH = "google/gemini-2.5-flash"


# class OpenAICompatibleName(StrEnum):
#     """https://platform.openai.com/docs/guides/text-generation"""

#     OPENAI_COMPATIBLE = "openai-compatible"


# class FakeModelName(StrEnum):
#     """Fake model for testing."""

#     FAKE = "fake"


# AllModelEnum: TypeAlias = (
#     OpenAIModelName
#     | OpenAICompatibleName
#     | AzureOpenAIModelName
#     | DeepseekModelName
#     | AnthropicModelName
#     | GoogleModelName
#     | VertexAIModelName
#     | GroqModelName
#     | AWSModelName
#     | OllamaModelName
#     | OpenRouterModelName
#     | FakeModelName
# )


# from enum import StrEnum
# from json import loads
# from typing import Annotated, Any

# from dotenv import find_dotenv
# from pydantic import (
#     BeforeValidator,
#     Field,
#     HttpUrl,
#     SecretStr,
#     TypeAdapter,
#     computed_field,
# )
# from pydantic_settings import BaseSettings, SettingsConfigDict

# from schema.models import (
#     AllModelEnum,
#     AnthropicModelName,
#     AWSModelName,
#     AzureOpenAIModelName,
#     DeepseekModelName,
#     FakeModelName,
#     GoogleModelName,
#     GroqModelName,
#     OllamaModelName,
#     OpenAICompatibleName,
#     OpenAIModelName,
#     OpenRouterModelName,
#     Provider,
#     VertexAIModelName,
# )


# class DatabaseType(StrEnum):
#     SQLITE = "sqlite"
#     POSTGRES = "postgres"
#     MONGO = "mongo"


# def check_str_is_http(x: str) -> str:
#     http_url_adapter = TypeAdapter(HttpUrl)
#     return str(http_url_adapter.validate_python(x))


# class Settings(BaseSettings):
#     model_config = SettingsConfigDict(
#                     env_file          = find_dotenv(),
#                     env_file_encoding = "utf-8",
#                     env_ignore_empty  = True,
#                     extra             = "ignore",
#                     validate_default  = False,
#     )
#     MODE: str | None = None

#     HOST: str = "0.0.0.0"
#     PORT: int = 8080

#     AUTH_SECRET: SecretStr | None = None

#     OPENAI_API_KEY: SecretStr | None = None
#     DEEPSEEK_API_KEY: SecretStr | None = None
#     ANTHROPIC_API_KEY: SecretStr | None = None
#     GOOGLE_API_KEY: SecretStr | None = None
#     GOOGLE_APPLICATION_CREDENTIALS: SecretStr | None = None
#     GROQ_API_KEY: SecretStr | None = None
#     USE_AWS_BEDROCK: bool = False
#     OLLAMA_MODEL: str | None = None
#     OLLAMA_BASE_URL: str | None = None
#     USE_FAKE_MODEL: bool = False
#     OPENROUTER_API_KEY: str | None = None



#     # If DEFAULT_MODEL is None, it will be set in model_post_init
#     DEFAULT_MODEL: AllModelEnum | None = None  # type: ignore[assignment]
#     # That set() there is just creating an empty set — and right now, it’s being typed as set[AllModelEnum] so that later you can fill it with enum members        # from     any of your model name enums.
#     AVAILABLE_MODELS: set[AllModelEnum] = set()  # type: ignore[assignment]

#     # Set openai compatible api, mainly used for proof of concept
#     COMPATIBLE_MODEL: str | None = None
#     COMPATIBLE_API_KEY: SecretStr | None = None
#     COMPATIBLE_BASE_URL: str | None = None

#     OPENWEATHERMAP_API_KEY: SecretStr | None = None

#     LANGCHAIN_TRACING_V2: bool = False
#     LANGCHAIN_PROJECT: str = "default"
#     LANGCHAIN_ENDPOINT: Annotated[str, BeforeValidator(check_str_is_http)] = (
#         "https://api.smith.langchain.com"
#     )
#     LANGCHAIN_API_KEY: SecretStr | None = None

#     LANGFUSE_TRACING: bool = False
#     LANGFUSE_HOST: Annotated[str, BeforeValidator(check_str_is_http)] = "https://cloud.langfuse.com"
#     LANGFUSE_PUBLIC_KEY: SecretStr | None = None
#     LANGFUSE_SECRET_KEY: SecretStr | None = None

#     # Database Configuration
#     DATABASE_TYPE: DatabaseType = (
#         DatabaseType.SQLITE
#     )  # Options: DatabaseType.SQLITE or DatabaseType.POSTGRES
#     SQLITE_DB_PATH: str = "checkpoints.db"

#     # PostgreSQL Configuration
#     POSTGRES_USER: str | None = None
#     POSTGRES_PASSWORD: SecretStr | None = None
#     POSTGRES_HOST: str | None = None
#     POSTGRES_PORT: int | None = None
#     POSTGRES_DB: str | None = None

#     # MongoDB Configuration
#     MONGO_HOST: str | None = None
#     MONGO_PORT: int | None = None
#     MONGO_DB: str | None = None
#     MONGO_USER: str | None = None
#     MONGO_PASSWORD: SecretStr | None = None
#     MONGO_AUTH_SOURCE: str | None = None

#     # Azure OpenAI Settings
#     AZURE_OPENAI_API_KEY: SecretStr | None = None
#     AZURE_OPENAI_ENDPOINT: str | None = None
#     AZURE_OPENAI_API_VERSION: str = "2024-02-15-preview"
#     AZURE_OPENAI_DEPLOYMENT_MAP: dict[str, str] = Field(
#         default_factory=dict, description="Map of model names to Azure deployment IDs"
#     )

#     def model_post_init(self, __context: Any) -> None:
#         api_keys = {
#             Provider.OPENAI: self.OPENAI_API_KEY,
#             Provider.OPENAI_COMPATIBLE: self.COMPATIBLE_BASE_URL and self.COMPATIBLE_MODEL,
#             Provider.DEEPSEEK: self.DEEPSEEK_API_KEY,
#             Provider.ANTHROPIC: self.ANTHROPIC_API_KEY,
#             Provider.GOOGLE: self.GOOGLE_API_KEY,
#             Provider.VERTEXAI: self.GOOGLE_APPLICATION_CREDENTIALS,
#             Provider.GROQ: self.GROQ_API_KEY,
#             Provider.AWS: self.USE_AWS_BEDROCK,
#             Provider.OLLAMA: self.OLLAMA_MODEL,
#             Provider.FAKE: self.USE_FAKE_MODEL,
#             Provider.AZURE_OPENAI: self.AZURE_OPENAI_API_KEY,
#             Provider.OPENROUTER: self.OPENROUTER_API_KEY,
#         }
#         active_keys = [k for k, v in api_keys.items() if v]
#         if not active_keys:
#             raise ValueError("At least one LLM API key must be provided.")

#         for provider in active_keys:
#             match provider:
#                 case Provider.OPENAI:
#                     if self.DEFAULT_MODEL is None:
#                         self.DEFAULT_MODEL = OpenAIModelName.GPT_4O_MINI
#                     self.AVAILABLE_MODELS.update(set(OpenAIModelName))
#                 case Provider.OPENAI_COMPATIBLE:
#                     if self.DEFAULT_MODEL is None:
#                         self.DEFAULT_MODEL = OpenAICompatibleName.OPENAI_COMPATIBLE
#                     self.AVAILABLE_MODELS.update(set(OpenAICompatibleName))
#                 case Provider.DEEPSEEK:
#                     if self.DEFAULT_MODEL is None:
#                         self.DEFAULT_MODEL = DeepseekModelName.DEEPSEEK_CHAT
#                     self.AVAILABLE_MODELS.update(set(DeepseekModelName))
#                 case Provider.ANTHROPIC:
#                     if self.DEFAULT_MODEL is None:
#                         self.DEFAULT_MODEL = AnthropicModelName.HAIKU_3
#                     self.AVAILABLE_MODELS.update(set(AnthropicModelName))
#                 case Provider.GOOGLE:
#                     if self.DEFAULT_MODEL is None:
#                         self.DEFAULT_MODEL = GoogleModelName.GEMINI_20_FLASH
#                     self.AVAILABLE_MODELS.update(set(GoogleModelName))
#                 case Provider.VERTEXAI:
#                     if self.DEFAULT_MODEL is None:
#                         self.DEFAULT_MODEL = VertexAIModelName.GEMINI_20_FLASH
#                     self.AVAILABLE_MODELS.update(set(VertexAIModelName))
#                 case Provider.GROQ:
#                     if self.DEFAULT_MODEL is None:
#                         self.DEFAULT_MODEL = GroqModelName.LLAMA_31_8B
#                     self.AVAILABLE_MODELS.update(set(GroqModelName))
#                 case Provider.AWS:
#                     if self.DEFAULT_MODEL is None:
#                         self.DEFAULT_MODEL = AWSModelName.BEDROCK_HAIKU
#                     self.AVAILABLE_MODELS.update(set(AWSModelName))
#                 case Provider.OLLAMA:
#                     if self.DEFAULT_MODEL is None:
#                         self.DEFAULT_MODEL = OllamaModelName.OLLAMA_GENERIC
#                     self.AVAILABLE_MODELS.update(set(OllamaModelName))
#                 case Provider.OPENROUTER:
#                     if self.DEFAULT_MODEL is None:
#                         self.DEFAULT_MODEL = OpenRouterModelName.GEMINI_25_FLASH
#                     self.AVAILABLE_MODELS.update(set(OpenRouterModelName))
#                 case Provider.FAKE:
#                     if self.DEFAULT_MODEL is None:
#                         self.DEFAULT_MODEL = FakeModelName.FAKE
#                     self.AVAILABLE_MODELS.update(set(FakeModelName))
#                 case Provider.AZURE_OPENAI:
#                     if self.DEFAULT_MODEL is None:
#                         self.DEFAULT_MODEL = AzureOpenAIModelName.AZURE_GPT_4O_MINI
#                     self.AVAILABLE_MODELS.update(set(AzureOpenAIModelName))
#                     # Validate Azure OpenAI settings if Azure provider is available
#                     if not self.AZURE_OPENAI_API_KEY:
#                         raise ValueError("AZURE_OPENAI_API_KEY must be set")
#                     if not self.AZURE_OPENAI_ENDPOINT:
#                         raise ValueError("AZURE_OPENAI_ENDPOINT must be set")
#                     if not self.AZURE_OPENAI_DEPLOYMENT_MAP:
#                         raise ValueError("AZURE_OPENAI_DEPLOYMENT_MAP must be set")

#                     # Parse deployment map if it's a string
#                     if isinstance(self.AZURE_OPENAI_DEPLOYMENT_MAP, str):
#                         try:
#                             self.AZURE_OPENAI_DEPLOYMENT_MAP = loads(
#                                 self.AZURE_OPENAI_DEPLOYMENT_MAP
#                             )
#                         except Exception as e:
#                             raise ValueError(f"Invalid AZURE_OPENAI_DEPLOYMENT_MAP JSON: {e}")

#                     # Validate required deployments exist
#                     required_models = {"gpt-4o", "gpt-4o-mini"}
#                     missing_models = required_models - set(self.AZURE_OPENAI_DEPLOYMENT_MAP.keys())
#                     if missing_models:
#                         raise ValueError(f"Missing required Azure deployments: {missing_models}")
#                 case _:
#                     raise ValueError(f"Unknown provider: {provider}")

#     @computed_field  # type: ignore[prop-decorator]
#     @property
#     def BASE_URL(self) -> str:
#         return f"http://{self.HOST}:{self.PORT}"

#     def is_dev(self) -> bool:
#         return self.MODE == "dev"


# settings = Settings()


alright — let’s unpack this “main block” (the settings module) three ways:

1. the global objective in the context of the whole repo
2. a sub-block by sub-block tour
3. a line-by-line “what this does for the whole system”

---

# Global objective (in the whole codebase)

This module is the **single source of truth for configuration**. It:

* Loads env vars (from `.env` if present), validates them, and exposes them as a typed `Settings` object.
* Decides which **LLM providers** are “active” (based on which API keys/flags you set).
* Populates the **default model** and the **set of available models** the whole system will advertise via `/info`.
* Validates provider-specific requirements (notably **Azure OpenAI** deployments).
* Centralizes service, tracing, and DB config that other components read:

  * `service.py` reads `AUTH_SECRET`, `/info` pulls `DEFAULT_MODEL` + `AVAILABLE_MODELS`, `/health` checks `LANGFUSE_*`, the lifespan/db layer uses DB settings, etc.

Net effect: everything else (agents, FastAPI service, streaming, history) can rely on `settings = Settings()` to know **what providers/models exist and how to talk to them**.

---

# Sub-block by sub-block

**Imports (Pydantic, settings, enums, etc.)**
Bring in the types and enum definitions you showed in your context block (providers + model names). Pydantic is used to load/validate from env vars.

**`DatabaseType` enum**
Declares which short-/long-term memory backends are supported (`sqlite`, `postgres`, `mongo`). The **lifespan** code in `service.py` will initialize the appropriate checkpointer/store depending on these values.

**`check_str_is_http`**
Tiny validator that coerces + validates a URL string; used to ensure things like `LANGFUSE_HOST` and `LANGCHAIN_ENDPOINT` are valid HTTP(S) URLs.

**`Settings` class**

* `model_config` tells Pydantic to read `.env`, ignore unknown keys, etc.
* Fields grouped by purpose:

  * **Service/HTTP**: host/port/base URL, auth secret
  * **Provider keys + flags**: OpenAI, Anthropic, Google, Vertex, Groq, AWS Bedrock, Ollama, OpenRouter, OpenAI-compatible, Fake
  * **Model selection**: `DEFAULT_MODEL`, `AVAILABLE_MODELS`
  * **Tracing/analytics**: LangChain, Langfuse
  * **DB choices**: SQLite/Postgres/Mongo, plus per-DB connection bits
  * **Azure OpenAI** specifics (endpoint, version, deployment map)
* **`model_post_init`**: after envs load, it:

  * Ensures **at least one provider** is configured.
  * Picks a **default model** (once) based on the **first active provider** encountered.
  * Populates `AVAILABLE_MODELS` with all enums for each active provider.
  * Validates **Azure OpenAI** (keys, endpoint, deployment map) and parses its map if passed as JSON.
* **`BASE_URL` computed field** builds `http://{HOST}:{PORT}`.
* **`is_dev`** toggles dev behavior based on `MODE`.

**Module-level `settings = Settings()`**
Eagerly loads and validates configuration at import time so the rest of the app can just use `from core.settings import settings`.

---

# Line-by-line (global objective of each line)

I’ll keep each line crisp—what it contributes to the whole system.

### Imports

* `from enum import StrEnum` — Use string-valued enums for stable, readable env/model names system-wide.
* `from json import loads` — Parse JSON strings from env (e.g., Azure deployment map).
* `from typing import Annotated, Any` — Type annotations; `Annotated` attaches validators.
* `from dotenv import find_dotenv` — Auto-locate a `.env` file so the service runs easily locally.
* `from pydantic import ...` — Pydantic tools to validate/coerce env vars into safe settings objects.
* `from pydantic_settings import BaseSettings, SettingsConfigDict` — Base class to read envs + config for how to read `.env`.
* `from schema.models import (...)` — Pull provider + model enums to (a) select defaults, (b) advertise `/info`, (c) constrain settings.

### `DatabaseType` enum

* `class DatabaseType(StrEnum):` — Named DB choices the memory layer can switch on.
* `SQLITE = "sqlite"`, `POSTGRES = "postgres"`, `MONGO = "mongo"` — Canonical strings that the lifespan store/checkpointer code will interpret.

### `check_str_is_http`

* `def check_str_is_http(x: str) -> str:` — Helper validator used by Annotated fields.
* `http_url_adapter = TypeAdapter(HttpUrl)` — Prepare a Pydantic URL adapter.
* `return str(http_url_adapter.validate_python(x))` — Validate & normalize a URL string so downstream components don’t get bad URLs.

### `class Settings(BaseSettings):`

* `model_config = SettingsConfigDict(...)` — Tell Pydantic where/how to load env:

  * `env_file = find_dotenv()` — Read `.env` if present for local/dev.
  * `env_file_encoding = "utf-8"` — Encoding for `.env`.
  * `env_ignore_empty = True` — Empty env vars are treated as missing (prevents false “configured” state).
  * `extra = "ignore"` — Unknown env vars are ignored (keeps deploys resilient).
  * `validate_default = False` — Don’t validate defaults at class definition time.
* `MODE: str | None = None` — Optional env to flip dev/other modes (`is_dev()` checks it).
* `HOST: str = "0.0.0.0"` — Bind address for FastAPI server (used in `BASE_URL`).
* `PORT: int = 8080` — Port for FastAPI server (used in `BASE_URL`).
* `AUTH_SECRET: SecretStr | None = None` — If set, enables **Bearer auth** in `service.py` (router dependency).
* Provider creds/flags (all optional, turn on a provider if present):

  * `OPENAI_API_KEY: SecretStr | None = None` — Enables OpenAI provider.
  * `DEEPSEEK_API_KEY: SecretStr | None = None` — Enables DeepSeek.
  * `ANTHROPIC_API_KEY: SecretStr | None = None` — Enables Anthropic.
  * `GOOGLE_API_KEY: SecretStr | None = None` — Enables Gemini via Google.
  * `GOOGLE_APPLICATION_CREDENTIALS: SecretStr | None = None` — Enables Vertex AI auth.
  * `GROQ_API_KEY: SecretStr | None = None` — Enables Groq.
  * `USE_AWS_BEDROCK: bool = False` — Enables Bedrock when `True`.
  * `OLLAMA_MODEL: str | None = None` — Enables Ollama if model name provided.
  * `OLLAMA_BASE_URL: str | None = None` — Optional base URL for local Ollama server.
  * `USE_FAKE_MODEL: bool = False` — Enables Fake model for tests/demos.
  * `OPENROUTER_API_KEY: str | None = None` — Enables OpenRouter.
* Model selection fields:

  * `DEFAULT_MODEL: AllModelEnum | None = None` — Placeholder; set later in `model_post_init` if missing.
  * `AVAILABLE_MODELS: set[AllModelEnum] = set()` — Accumulates every model name across active providers; used by `/info`.
* OpenAI-compatible “shim” (for custom or self-hosted OpenAI-style servers):

  * `COMPATIBLE_MODEL`, `COMPATIBLE_API_KEY`, `COMPATIBLE_BASE_URL` — Turn on a generic OpenAI-compatible backend if present.
* Other API:

  * `OPENWEATHERMAP_API_KEY` — Used by tools that call weather (not by the service core).
* LangChain/LangSmith tracing:

  * `LANGCHAIN_TRACING_V2: bool = False` — Opt-in to v2 tracing.
  * `LANGCHAIN_PROJECT: str = "default"` — Project name for traces.
  * `LANGCHAIN_ENDPOINT: Annotated[str, BeforeValidator(check_str_is_http)] = "https://api.smith.langchain.com"` — Validated URL for tracing API.
  * `LANGCHAIN_API_KEY: SecretStr | None = None` — Auth for traces.
* Langfuse tracing:

  * `LANGFUSE_TRACING: bool = False` — Toggle Langfuse. `service.py:/health` uses this to check connectivity.
  * `LANGFUSE_HOST: Annotated[str, BeforeValidator(check_str_is_http)] = "https://cloud.langfuse.com"` — Validated host.
  * `LANGFUSE_PUBLIC_KEY`, `LANGFUSE_SECRET_KEY` — Auth; used when the service emits traces.
* Database config:

  * `DATABASE_TYPE: DatabaseType = DatabaseType.SQLITE` — Default to SQLite; lifespan uses this to pick checkpointer/store.
  * `SQLITE_DB_PATH: str = "checkpoints.db"` — Default SQLite file for short-term memory.
* Postgres:

  * `POSTGRES_*` — Connection params used when `DATABASE_TYPE=postgres`.
* Mongo:

  * `MONGO_*` — Connection params used when `DATABASE_TYPE=mongo`.
* Azure OpenAI:

  * `AZURE_OPENAI_API_KEY`, `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_API_VERSION` — Core connection info.
  * `AZURE_OPENAI_DEPLOYMENT_MAP: dict[str, str]` — Map **model name → deployment ID**; required for routing OpenAI model names to Azure deployments.

### `def model_post_init(self, __context: Any) -> None:`

Runs **after** Pydantic loads env vars. This is where the “active providers → default/available models” logic happens.

* `api_keys = { ... }` — Build a dict of **provider → “truthy if configured”**. It uses:

  * API keys when applicable,
  * flags (`USE_AWS_BEDROCK`, `USE_FAKE_MODEL`),
  * presence of values (`COMPATIBLE_BASE_URL and COMPATIBLE_MODEL`, `OLLAMA_MODEL`).
* `active_keys = [k for k, v in api_keys.items() if v]` — Compute the list of **configured providers**.
* `if not active_keys: raise ...` — Hard fail: the whole service needs at least one LLM to function.
* `for provider in active_keys:` — For each configured provider:

  * `match provider:` — Choose branch per provider.
  * Each branch does two things:

    1. **Set `DEFAULT_MODEL` if still unset** (first active provider wins):

       * e.g., OpenAI → `GPT_4O_MINI`, Anthropic → `HAIKU_3`, Groq → `LLAMA_31_8B`, etc.
    2. **Populate `AVAILABLE_MODELS`** with **the entire enum** for that provider so `/info` can advertise selectable models.
* Special Azure branch:

  * Sets default to `AzureOpenAIModelName.AZURE_GPT_4O_MINI`.
  * Adds all Azure model enum members to `AVAILABLE_MODELS`.
  * **Validates** Azure envs (key, endpoint, deployment map must exist).
  * If `AZURE_OPENAI_DEPLOYMENT_MAP` is a string, `loads` parses it as JSON (supports `AZURE_OPENAI_DEPLOYMENT_MAP='{"gpt-4o":"dep1", ...}'` in `.env`).
  * Ensures deployments for **`gpt-4o` and `gpt-4o-mini`** exist (these two models are required by the toolkit’s Azure path).
* `case _:` — Guard against unknown providers (defensive programming).

### Computed/base helpers

* `@computed_field` + `def BASE_URL(self) -> str:` — Derive `http://{HOST}:{PORT}` so **clients** (and examples) can target the local server without recomputing.
* `def is_dev(self) -> bool:` — Convenience for feature toggles in dev (`MODE=="dev"`).

### Module instantiation

* `settings = Settings()` — **Load and validate** configuration **eagerly** at import time. From here on, every module can `from core.settings import settings` and trust it’s valid.

---

## Why this matters to `service.py` specifically

* **Auth**: `verify_bearer` reads `settings.AUTH_SECRET` to protect routes.
* **/info**: Uses `settings.AVAILABLE_MODELS` and `settings.DEFAULT_MODEL` to advertise models per your env.
* **Health**: `/health` checks `settings.LANGFUSE_TRACING` and keys to report tracing connectivity.
* **Lifespan / memory**: DB-related fields (e.g., `DATABASE_TYPE`, `POSTGRES_*`) drive which checkpointer/store `initialize_database()` / `initialize_store()` choose.

If you want, I can next trace a concrete path: “set these envs → see what `AVAILABLE_MODELS` becomes → what `/info` returns → how an agent invocation uses `DEFAULT_MODEL`” — just say the provider(s) you care about.


# How imports are working

Here’s the summary in 10–15 lines:

1. Your project uses a **`src/` layout**, where `service` and `memory` are **sibling packages** under `src`.
2. Both folders have an `__init__.py`, making them **top-level Python packages**.
3. Python searches packages using `sys.path`; when you run from the repo root with `src` on the path, it can see `memory`.
4. **Absolute import** `from memory import …` works because Python finds `src/memory/__init__.py`.
5. **Relative import** like `from .memory import …` would fail because there is no `service/memory` subfolder.
6. `initialize_database()` and `initialize_store()` are exposed by `memory/__init__.py`.
7. They **decide at runtime** which backend to use: SQLite (default), Postgres, or Mongo.
8. These functions return **async context managers** for short-term (checkpointer) and long-term (store) memory.
9. `service.py` can stay backend-agnostic because it just calls these two initializers.
10. FastAPI’s `lifespan` uses them to set up memory for all agents on app startup.
11. Python will resolve the import as long as `src` is on `sys.path` (via `-m`, `PYTHONPATH`, or editable install).
12. Sibling packages under the same path can always import each other via **absolute imports**.
13. That’s why `memory` doesn’t need to be inside the `service` folder.
14. The current structure is correct and Pythonic for multi-package projects.
15. Relative imports are only needed if the module is a **subfolder** of the importing package.


## **How to make changes in modules and make them globally available**

Here’s how you would **add a simple function** in the `memory` module to print `"server starting"` and make it importable cleanly from `memory` in your `service.py` (or anywhere else in the project):

---

### **1. Add the function**

Open `C:\Users\Ans\Desktop\code\16_agent_services_toolkit\agent-service-toolkit\src\memory\__init__.py` and add:

```python
def announce_start():
    """Print a message indicating the server is starting."""
    print("server starting")
```

---

### **2. Update `__all__`**

`__all__` defines which symbols are exposed when you do:

```python
from memory import *
```

Currently it’s:

```python
__all__ = ["initialize_database", "initialize_store"]
```

Update to include your new function:

```python
__all__ = ["initialize_database", "initialize_store", "announce_start"]
```

This is **optional** unless you rely on `*` imports, but it’s a good practice for clarity.

---

### **3. Import and use in `service.py`**

At the top of `service.py`:

```python
from memory import initialize_database, initialize_store, announce_start
```

Then, wherever you want to announce (e.g., in your `lifespan` startup before `yield`):

```python
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    try:
        announce_start()  # <-- This will print "server starting"
        async with initialize_database() as saver, initialize_store() as store:
            ...
            yield
    except Exception as e:
        logger.error(f"Error during database/store initialization: {e}")
        raise
```

---

### ✅ Now when the server starts

* FastAPI triggers `lifespan` → `announce_start()` runs
* You’ll see:

```
server starting
```

in your console.

---

### **4. Verify import works**

You can test in a Python shell (from `src` directory):

`poetry env activate`

copy the string for activating virtual env, then ->


```bash
python
>>> from memory import announce_start
>>> announce_start()
server starting
```

---

If you want, I can also **show a clean pattern** for “startup hooks” so you don’t just `print` but also use **FastAPI’s logging** correctly for better production output.

Do you want that? It’s a small change that makes it look professional.


# Authentication

In [None]:
# # `verify_bearer` is a **FastAPI dependency** that enforces optional Bearer token authentication:

# # 1. Uses `HTTPBearer(auto_error=False)` to parse `Authorization: Bearer <token>` headers.
# # 2. If no header is present, `http_auth` is `None`.
# # 3. If `settings.AUTH_SECRET` is **unset**, auth is skipped.
# # 4. If set, retrieves the real secret via `.get_secret_value()`.
# # 5. Compares provided token (`http_auth.credentials`) to the secret.
# # 6. If missing or incorrect → raises `HTTPException(401)`.
# # 7. Returning `None` means the request is allowed.
# # 8. Applied at the router level → all endpoints in that router are protected.
# # 9. Provides simple, single-secret, service-wide authentication.
# # 10. Acts as a **gatekeeper** without modifying the request object.

# def verify_bearer(
#     http_auth: Annotated[
#         HTTPAuthorizationCredentials | None,
#         Depends(HTTPBearer(description="Please provide AUTH_SECRET api key.", auto_error=False)),
#     ],
# ) -> None:
#     if not settings.AUTH_SECRET:
#         return
#     auth_secret = settings.AUTH_SECRET.get_secret_value()
#     if not http_auth or http_auth.credentials != auth_secret:
#         raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)

This block is the **authentication dependency** for the FastAPI routes.
Here’s what it does step by step:

---

### **Function signature**

```python
def verify_bearer(
    http_auth: Annotated[
        HTTPAuthorizationCredentials | None,
        Depends(HTTPBearer(description="Please provide AUTH_SECRET api key.", auto_error=False)),
    ],
) -> None:
```

* **`http_auth`**

  * Annotated type:

    ```python
    HTTPAuthorizationCredentials | None
    ```

    * If the client provides a Bearer token, FastAPI parses it into an `HTTPAuthorizationCredentials` object.
    * If no token is provided, it may be `None` because `auto_error=False`.

  * Dependency:

    ```python
    Depends(HTTPBearer(...))
    ```

    * `HTTPBearer` is a FastAPI security dependency that:

      1. Looks for an `Authorization` header like:

         ```
         Authorization: Bearer <token>
         ```
      2. Extracts the `<token>` as `http_auth.credentials`.
      3. If `auto_error=False` → it **won’t raise automatically** if no header is present; instead, it returns `None`.

* **Return type: `None`**

  * This dependency either **returns successfully** (allowing the request to proceed) or **raises an HTTPException** (stopping the request).

---

### **Function logic**

```python
if not settings.AUTH_SECRET:
    return
```

* If no `AUTH_SECRET` is set in your settings, the function returns immediately → **auth is disabled**.

```python
auth_secret = settings.AUTH_SECRET.get_secret_value()
```

* Retrieves the **actual secret string** from a Pydantic `SecretStr` or similar secret type.

```python
if not http_auth or http_auth.credentials != auth_secret:
    raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
```

* If:

  1. No `Authorization` header was provided (`http_auth is None`), **or**
  2. The provided token doesn’t match the secret,

  → raise **401 Unauthorized**.

---

### **How it is used**

In your script:

```python
router = APIRouter(dependencies=[Depends(verify_bearer)])
```

* All routes in this `router` automatically require passing `verify_bearer`.
* If the function raises `HTTPException`, FastAPI immediately stops and returns 401.

---

### **Example behavior**

1. **Auth disabled (`AUTH_SECRET=None`)**

   * Any request passes, even without a token.

2. **Auth enabled**

   ```http
   Authorization: Bearer correct_secret
   ```

   ✅ Request passes

   ```http
   Authorization: Bearer wrong_secret
   ```

   ❌ Returns 401

   No header:
   ❌ Returns 401

---

If you want, I can also **show the exact shape** of `http_auth` and explain why `Annotated` is used here instead of a plain type.

Do you want that?


# lifespan

In [None]:
# @asynccontextmanager
# async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
#     """
#     Configurable lifespan that initializes the appropriate database checkpointer and store
#     based on settings.
#     """
#     try:
#         # Initialize both checkpointer (for short-term memory) and store (for long-term memory)
#         async with initialize_database() as saver, initialize_store() as store:
#             # Set up both components
#             if hasattr(saver, "setup"):  # ignore: union-attr
#                 await saver.setup()
#             # Only setup store for Postgres as InMemoryStore doesn't need setup
#             if hasattr(store, "setup"):  # ignore: union-attr
#                 await store.setup()

#             # Configure agents with both memory components
#             agents = get_all_agent_info()
#             for a in agents:
#                 agent = get_agent(a.key)
#                 # Set checkpointer for thread-scoped memory (conversation history)
#                 agent.checkpointer = saver
#                 # Set store for long-term memory (cross-conversation knowledge)
#                 agent.store = store
#             yield
#     except Exception as e:
#         logger.error(f"Error during database/store initialization: {e}")
#         raise


# app = FastAPI(lifespan=lifespan)
# router = APIRouter(dependencies=[Depends(verify_bearer)])

               FastAPI App Startup
                       │
                       ▼
        ┌─────────────────────────────────┐
        │  asynccontextmanager: lifespan   │
        └─────────────────────────────────┘
                       │
                       │ 1️⃣ Enter context
                       │
                       ▼
        ┌─────────────────────────────────┐
        │ Initialize short-term memory     │
        │  saver = initialize_database()   │
        │--------------------------------- │
        │ Initialize long-term memory      │
        │  store = initialize_store()      │
        │--------------------------------- │
        │ Optional setup: saver.setup()    │
        │ Optional setup: store.setup()    │
        │--------------------------------- │
        │ Attach memory to all agents:     │
        │   agent.checkpointer = saver     │
        │   agent.store = store            │
        └─────────────────────────────────┘
                       │
                       │ 2️⃣ yield  (hand control to FastAPI)
                       ▼
               FastAPI Starts Serving
                       │
                       │
                       ▼
       ┌─────────────────────────────────┐
       │         Handle Requests          │
       │     /invoke, /stream, /info ...  │
       └─────────────────────────────────┘
                       │
                       │
                       ▼
              FastAPI Shutdown Event
                       │
                       │ 3️⃣ Exit context
                       ▼
        ┌─────────────────────────────────┐
        │     (Teardown would happen)      │
        │    e.g., close DB connections    │
        │          release memory          │
        └─────────────────────────────────┘
                       │
                       ▼
                 Application Stops


Thanks for sharing the `agents.py` context — this explains how `agents` are managed and why the `lifespan` block behaves the way it does. Let me explain **why older agents might appear to be replaced** and how that block interacts with your `agents` dictionary.

---

### **1. What your `agents` structure looks like**

```python
agents: dict[str, Agent] = {
    "chatbot": Agent(..., graph=chatbot),
    "research-assistant": Agent(..., graph=research_assistant),
    ...
}
```

* Keys: `agent_id` strings
* Values: `Agent` dataclass instances with:

  * `description: str`
  * `graph: AgentGraph` (either `CompiledStateGraph` or `Pregel`)

---

### **2. `get_agent()` and `get_all_agent_info()`**

* **`get_agent(agent_id)`** → returns **`Agent.graph`**, not the `Agent` object itself.

  * E.g., returns a `CompiledStateGraph` instance like `chatbot` or `research_assistant`.
* **`get_all_agent_info()`** → returns lightweight `AgentInfo` dataclasses for API use:

  ```python
  [
      AgentInfo(key="chatbot", description="A simple chatbot."),
      AgentInfo(key="research-assistant", description="A research assistant with web search...")
      ...
  ]
  ```

---

### **3. What `lifespan` is doing to agents**

In your startup:

```python
agents = get_all_agent_info()
for a in agents:
    agent = get_agent(a.key)
    agent.checkpointer = saver
    agent.store = store
```

Step by step:

1. `get_all_agent_info()` → returns **metadata objects**, not the agents themselves.
2. Loop through them, then `get_agent(a.key)` → fetches the **actual `graph` instance**.
3. Adds two attributes **at runtime** to that instance:

   * `checkpointer` (short-term memory)
   * `store` (long-term memory)

**Effect:**
Each **LangGraph agent** instance is dynamically augmented with these memory components at startup.

---

### **4. Why “older agents are replaced” feeling happens**

A few key points:

* **Your `agents` dict is global and static.**
  When the service starts, these are the *only* agent objects in memory.

* **`lifespan` modifies the same instances in place.**
  They are not replaced; they are **mutated** to attach memory:

  ```python
  agent.checkpointer = saver
  agent.store = store
  ```

* **No new instances are created.**
  This means the same `chatbot` or `research_assistant` object is reused across requests.

* **If you hot-reload or restart**, the entire dictionary is rebuilt from scratch because Python reloads the module.
  That might feel like “old agents are replaced by new ones,” but it’s just module re-import + new memory objects each time.

---

### **5. Important subtlety**

Your `agents` dictionary holds **graph objects** imported from submodules:

```python
from agents.chatbot import chatbot  # this is likely a CompiledStateGraph instance
```

When `lifespan` does:

```python
agent = get_agent(a.key)  # returns that same imported object
agent.checkpointer = saver
agent.store = store
```

* These attributes (`checkpointer`, `store`) **didn’t exist originally**.
* Python is dynamic, so it attaches them to the object at runtime.
* If your LangGraph agent is a `CompiledStateGraph` or `Pregel` instance, this is legal but **non-persistent**.
* On restart → they’re “gone” until `lifespan` runs again.

---

### **6. Why you might feel they are replaced**

* **Restart or reload** → new `agents` dictionary created.
* **Memory objects (`saver` / `store`) are new each startup.**
* **Dynamic attributes** don’t persist anywhere (not saved to DB).

So if your app is **auto-reloading** (e.g., `uvicorn --reload`), every reload rebuilds the entire `agents` dictionary and reattaches memory objects.

---

### **7. Visualization**

Here’s what happens on startup:

```
1. Import agents.py
   agents = {
      "chatbot": Agent(graph=<chatbot CompiledStateGraph>),
      ...
   }

2. FastAPI lifespan runs:
   saver, store = initialize_database(), initialize_store()
   for each agent:
       agent.graph.checkpointer = saver
       agent.graph.store = store

3. Requests start coming in:
   get_agent("chatbot") -> returns graph with memory attached
```

On reload, step 1 runs again → “old” objects are discarded → “new” ones are built.

---

If your concern is **persisting memory or state across restarts**, you’d need to either:

1. Serialize state to the database/store (which seems to be the intended approach using `saver` + `store`), or
2. Maintain agent instances externally (not reloaded each time).

---

I can also show you a **safe way to inspect and confirm** whether your agents are being replaced at runtime by adding a diagnostic log.

Do you want me to show that next? It will help you confirm if agents are *mutated in place* or *recreated*.


# Async postgres store

In [None]:
# osi trah hi use hota hay, (a)sync support bhi hay!

from langgraph.store.postgres import AsyncPostgresStore

conn_string = "postgresql://postgres:1538879@localhost:5430/long_term_memory"

async with AsyncPostgresStore.from_conn_string(conn_string) as store:
    # Run migrations (creates tables & indexes). Only needed once.
    await store.setup()
    # ... use the store for put/get/search operations
    # Put
    await store.aput(("users","123"), "prefs", {"theme":"dark"})
    
    # Get
    prefs = await store.aget(("users","123"), "prefs")
    print(prefs)  # {"theme":"dark"}
    
    # # Delete
    # await store.adelete(("users","123"), "prefs")
    
    # Search textual matches
    matches = await store.asearch(("docs",), query="guide", limit=5)
    
    # # List sub-namespaces
    # namespaces = await store.alist_namespaces(("docs",), max_depth=1)

# @router.get("/info")

## 🧠 What You're Looking At

You're looking at the **schema models and enums** that define the response for the `GET /info` endpoint.

These models do two key things:

1. Structure the **JSON response** returned by FastAPI.
2. Automatically generate documentation (via OpenAPI/Swagger) for API consumers.

---

## 🔁 Let's Connect It Back to This:

```python
@router.get("/info")
async def info() -> ServiceMetadata:
    models = list(settings.AVAILABLE_MODELS)
    models.sort()
    return ServiceMetadata(
        agents=get_all_agent_info(),
        models=models,
        default_agent=DEFAULT_AGENT,
        default_model=settings.DEFAULT_MODEL,
    )
```

It returns an instance of `ServiceMetadata`. So let’s unpack that model in full context.

---

## 🔹 1. `ServiceMetadata` (Pydantic Response Model)

```python
class ServiceMetadata(BaseModel):
    """Metadata about the service including available agents and models."""

    agents: list[AgentInfo]
    models: list[AllModelEnum]
    default_agent: str
    default_model: AllModelEnum
```

This defines the **full JSON structure** returned to the client.

### ➕ Field-by-field:

| Field           | Type                 | Description                                       |
| --------------- | -------------------- | ------------------------------------------------- |
| `agents`        | `list[AgentInfo]`    | All agents available in the service.              |
| `models`        | `list[AllModelEnum]` | All LLM model names this service supports.        |
| `default_agent` | `str`                | The fallback agent used when one isn't specified. |
| `default_model` | `AllModelEnum`       | The fallback LLM model name.                      |

---

## 🔹 2. `AgentInfo`

```python
class AgentInfo(BaseModel):
    """Info about an available agent."""

    key: str
    description: str
```

This is the shape of each item in the `agents` list.

### Example:

```json
{
  "key": "research-assistant",
  "description": "A research assistant for generating research papers."
}
```

Where does this come from?

* Likely from `get_all_agent_info()`, which returns a list of `AgentInfo` instances — one per agent.

---

## 🔹 3. `AllModelEnum` — (a Type Alias for *many* model types)

This is a **type union** alias (not a class):

```python
AllModelEnum: TypeAlias = (
    OpenAIModelName
    | OpenAICompatibleName
    | AzureOpenAIModelName
    | DeepseekModelName
    | AnthropicModelName
    | GoogleModelName
    | VertexAIModelName
    | GroqModelName
    | AWSModelName
    | OllamaModelName
    | OpenRouterModelName
    | FakeModelName
)
```

It combines many individual enum classes — one per provider or platform.

This means any string returned as a model must match one of the valid values from one of these enums.

---

## 🔹 4. Model Enums

Let’s look at a couple of them:

### ✅ `OpenAIModelName`

```python
class OpenAIModelName(StrEnum):
    GPT_4O_MINI = "gpt-4o-mini"
    GPT_4O = "gpt-4o"
```

Valid model values from OpenAI. The keys are internal names, the values are what get serialized to JSON.

---

### ✅ `AzureOpenAIModelName`

```python
class AzureOpenAIModelName(StrEnum):
    AZURE_GPT_4O = "azure-gpt-4o"
    AZURE_GPT_4O_MINI = "azure-gpt-4o-mini"
```

Same thing, for Azure-hosted OpenAI models.

---

### ✅ `AnthropicModelName`

```python
class AnthropicModelName(StrEnum):
    HAIKU_3 = "claude-3-haiku"
    HAIKU_35 = "claude-3.5-haiku"
    SONNET_35 = "claude-3.5-sonnet"
```

Models from Anthropic (Claude 3 family).

---

## 🔹 5. `AVAILABLE_MODELS`

```python
AVAILABLE_MODELS: set[AllModelEnum] = set()
```

* This is a `set` containing all supported models.
* It gets converted to a list in `GET /info`:

  ```python
  models = list(settings.AVAILABLE_MODELS)
  models.sort()
  ```

This is how you define **which specific models are enabled** in this deployment.

> 🔍 Somewhere in your code or config (not shown here), you must be populating `AVAILABLE_MODELS` like:
>
> ```python
> AVAILABLE_MODELS = {
>     OpenAIModelName.GPT_4O,
>     AnthropicModelName.HAIKU_3,
>     DeepseekModelName.DEEPSEEK_CHAT,
> }
> ```

---


## ✅ Why This Design Is Good

* **Strong typing with enums**: prevents typos or unsupported models from sneaking into API responses.
* **Clear OpenAPI docs**: Pydantic models generate schema docs, so developers using your API know what agents and models they can pick from.
* **Flexible & extensible**: You can add new providers (e.g., `Cohere`, `Mistral`) by just extending `AllModelEnum`.

---

## 👇 In Summary

| Thing                            | Purpose                                         |
| -------------------------------- | ----------------------------------------------- |
| `ServiceMetadata`                | Pydantic model defining the `/info` response    |
| `AgentInfo`                      | Describes a single agent (key + description)    |
| `AllModelEnum`                   | Union of all possible model enums               |
| `*ModelName` enums               | Valid model names from providers                |
| `AVAILABLE_MODELS`               | Set of currently active models in this app      |
| `default_agent`, `default_model` | Fallbacks used when not specified by the client |

---

If you'd like to explore how `get_all_agent_info()` works next — or see how these enums get populated at runtime — let me know and I’ll trace it with you.


An **Enum** (short for *enumeration*) in Python is a special class that defines a **fixed set of named values**.
It’s useful when you want to represent a set of constants that are **meaningful** and **type-safe**, instead of using raw strings or numbers everywhere.

---

## 1️⃣ Basic idea

Without enums, you might do this:

```python
# Bad practice: loose strings everywhere
status = "success"
if status == "success":
    ...
```

The risk? You could mistype `"succes"` and Python wouldn’t complain — it’s just a string.

With an Enum:

```python
from enum import Enum

class Status(Enum):
    SUCCESS = "success"
    ERROR = "error"
    PENDING = "pending"

status = Status.SUCCESS

if status == Status.SUCCESS:
    ...
```

Now:

* Values are **namespaced** (you can’t accidentally mix them with random strings)
* Typos are caught early by type checkers
* Autocompletion in your IDE helps you choose valid options

---

## 2️⃣ How they work

Each Enum member:

* Has a **name** (e.g., `SUCCESS`)
* Has a **value** (e.g., `"success"`)

Example:

```python
print(Status.SUCCESS.name)   # SUCCESS
print(Status.SUCCESS.value)  # success
```

---

## 3️⃣ `StrEnum` vs normal `Enum`

In your code, you’re using:

```python
from enum import StrEnum
```

This is like `Enum`, but each member automatically behaves like a **string**.
That means:

```python
class Color(StrEnum):
    RED = "red"

print(Color.RED)        # red
print(isinstance(Color.RED, str))  # True
```

This is handy when your values need to be sent as plain strings to APIs (like `"gpt-4o"`), but you still want enum safety in your code.

---

## 4️⃣ Why use them in your snippet?

* **Prevent invalid values**: You can’t set a model to `"gpt4"` if only `"gpt-4o"` is valid.
* **Make code self-documenting**: `OpenAIModelName.GPT_4O` is clearer than `"gpt-4o"`.
* **IDE autocompletion**: Suggests only valid models/providers.
* **Easy refactoring**: Change the value in one place, update everywhere.

---

In short:
An **Enum** is Python’s way of saying,
*"Here’s a list of allowed options. You can only pick from these, and each one has a clear name and value."*


What you’ve got here is essentially a **typed, enum-based catalog** of all the LLM providers and their model names, wrapped up so that they can be stored in a `ServiceMetadata` Pydantic model for use in code that selects or configures an AI service.

Let me walk you through it step-by-step.

---

## 1. **The `Provider` enum**

```python
class Provider(StrEnum):
    OPENAI            = auto()
    OPENAI_COMPATIBLE = auto()
    AZURE_OPENAI      = auto()
    ...
```

* **`StrEnum`** is like a regular Python `Enum`, but its members are also strings.
* **`auto()`** automatically assigns values, here meaning each enum member’s value will be the same as its name (e.g., `"OPENAI"`).
* This enum lists **service providers** — companies or APIs that serve LLMs.

---

## 2. **Model name enums**

Each provider has its own `StrEnum` of **model identifiers**, e.g.:

```python
class OpenAIModelName(StrEnum):
    GPT_4O_MINI = "gpt-4o-mini"
    GPT_4O = "gpt-4o"
```

These:

* Use **string values** exactly as they are expected in API calls.
* Are grouped by provider for clarity.
* Include docstrings with links to official documentation.

So you have:

* `OpenAIModelName` for OpenAI
* `AzureOpenAIModelName` for Azure’s OpenAI service
* `DeepseekModelName` for Deepseek
* `AnthropicModelName` for Claude models
* … and so on.

---

## 3. **`AllModelEnum` type alias**

```python
AllModelEnum: TypeAlias = (
    OpenAIModelName
    | OpenAICompatibleName
    | AzureOpenAIModelName
    | DeepseekModelName
    | AnthropicModelName
    | GoogleModelName
    | VertexAIModelName
    | GroqModelName
    | AWSModelName
    | OllamaModelName
    | OpenRouterModelName
    | FakeModelName
)
```

This means:

> "`AllModelEnum` can be **any one of these enums**."

It lets you type things like:

```python
def use_model(model: AllModelEnum):
    ...
```

and still get **type checking & autocompletion** for all model names, even though they come from different providers.

---

## 4. **`ServiceMetadata` model**

```python
class ServiceMetadata(BaseModel):
    agents: list[AgentInfo]
    models: list[AllModelEnum]
    default_agent: str
    default_model: AllModelEnum
```

This is a **Pydantic model** that:

* Stores:

  * Which agents are available (`agents`)
  * Which models are available (`models`)
  * Defaults for both (`default_agent`, `default_model`)
* Uses your `AllModelEnum` type to ensure the model names are **valid**.
* Adds `description` and `examples` metadata for API docs or schema generation.

---

### **In short**

* **Enums** define *who* (providers) and *what* (model names) in a structured, type-safe way.
* **`AllModelEnum`** is a **union type** so functions can accept *any valid model name* without losing IntelliSense/type hints.
* **`ServiceMetadata`** is a **Pydantic schema** for storing and validating what’s available in your AI service setup.

If you were to load this from a config or an API, you could do:

```python
service_meta = ServiceMetadata(
    agents=[AgentInfo(name="research-assistant")],
    models=[OpenAIModelName.GPT_4O, AnthropicModelName.SONNET_35],
    default_agent="research-assistant",
    default_model=OpenAIModelName.GPT_4O
)
```

…and Pydantic would enforce that you don’t accidentally type `"gpt4o"` or `"Claude_Sonnet"` incorrectly.

---

If you want, I can **diagram how all these enums and the Pydantic model connect** so you can see the relationship visually.


Great questions — you’ve bumped into the “Enums are objects but can also be strings” wrinkle. Let’s untangle it.

### What is `AllModelEnum`, really?

`AllModelEnum` is a **type alias** for a **union of several `StrEnum` classes** (each class holds model names for one provider). So a value of type `AllModelEnum` is **an enum member** from *one of those classes*, e.g. `OpenAIModelName.GPT_4O` or `AnthropicModelName.SONNET_35`.

### Are these “strings” or “objects”?

Both, kind of:

* Each member (e.g. `OpenAIModelName.GPT_4O`) is an **enum object**.
* Because they inherit from **`StrEnum`**, they also **subclass `str`**. That means they behave like strings and **serialize as strings**, but they still carry their enum type.

```py
from enum import StrEnum

class OpenAIModelName(StrEnum):
    GPT_4O = "gpt-4o"

m = OpenAIModelName.GPT_4O
assert isinstance(m, OpenAIModelName)  # True (enum member)
assert isinstance(m, str)              # True (because StrEnum)
m == "gpt-4o"                          # True
m.value                                # "gpt-4o"
```

### So is the `Field` description wrong?

No. This line:

```python
default_model: AllModelEnum = Field(
    description="Default model used when none is specified.",
)
```

…means **the field holds an enum member** from any of the listed `StrEnum`s (the union). It is **not** typed as a plain `str`, but:

* **Pydantic will accept a string** like `"gpt-4o"` and coerce it into the correct enum member.
* **When exporting to JSON**, Pydantic will emit the **string value** (e.g. `"gpt-4o"`).

So: *type-wise*, it’s an enum member; *at runtime and on the wire*, it behaves like (and becomes) a string.

### “Providers” vs “Models”

* `Provider` is a separate `StrEnum` listing providers (OpenAI, Anthropic, etc.). It uses `auto()`, so each member’s value is its **name as a string** (e.g. `"OPENAI"`).
* `AllModelEnum` does **not** include `Provider`. It’s only about **model names** (like `"gpt-4o"`, `"claude-3.5-sonnet"`, etc.), grouped per provider.

### One important caveat (you do have it!)

You’ve got **duplicate string values** across different enums (e.g. `gemini-1.5-pro` exists in both `GoogleModelName` and `VertexAIModelName`, with slightly different prefixes in one case). When you validate a **Union of Enums**, Pydantic picks the **first matching enum** in the union order. That can be surprising:

```py
# If both enums contain "gemini-1.5-pro"
ServiceMetadata(default_model="gemini-1.5-pro")
# will be parsed as whichever enum appears first in AllModelEnum.
```

If you want to avoid ambiguity, consider one of these designs:

1. **Single flat `StrEnum`** with all unique model values.
2. **Structured ID**: store `{ provider: Provider, model: str }` (or `{provider, model_enum}`) so collisions can’t happen.
3. **Namespace the values** (e.g. `"google/gemini-1.5-pro"` vs `"vertexai/gemini-1.5-pro"`).

### TL;DR

* `default_model` is typed as **an enum member** (not plain `str`), but it **behaves like a string** and **serializes to a string** because it’s a `StrEnum`.
* `Provider` is unrelated to `AllModelEnum` and simply lists providers.
* Your description is fine; just be aware of **value collisions** across different model enums in the union.

If you want, I can show a quick tweak to make your schema unambiguous (e.g., a tiny `ModelRef` Pydantic model with `{provider, name}`) and how to migrate your current code in a couple of lines.


# @router.post("/{agent_id}/invoke")
# @router.post("/invoke")

In [None]:
# async def _handle_input(user_input: UserInput, agent: AgentGraph) -> tuple[dict[str, Any], UUID]:
#     """
#     Parse user input and handle any required interrupt resumption.
#     Returns kwargs for agent invocation and the run_id.
#     """
#     run_id = uuid4()
#     thread_id = user_input.thread_id or str(uuid4())
#     user_id = user_input.user_id or str(uuid4())

#     configurable = {"thread_id": thread_id, "model": user_input.model, "user_id": user_id}

#     callbacks = []
#     if settings.LANGFUSE_TRACING:
#         # Initialize Langfuse CallbackHandler for Langchain (tracing)
#         langfuse_handler = CallbackHandler()

#         callbacks.append(langfuse_handler)

#     if user_input.agent_config:
#         if overlap := configurable.keys() & user_input.agent_config.keys():
#             raise HTTPException(
#                 status_code=422,
#                 detail=f"agent_config contains reserved keys: {overlap}",
#             )
#         configurable.update(user_input.agent_config)

#     config = RunnableConfig(
#         configurable=configurable,
#         run_id=run_id,
#         callbacks=callbacks,
#     )

#     # Check for interrupts that need to be resumed
#     state = await agent.aget_state(config=config)
#     interrupted_tasks = [
#         task for task in state.tasks if hasattr(task, "interrupts") and task.interrupts
#     ]

#     input: Command | dict[str, Any]
#     if interrupted_tasks:
#         # assume user input is response to resume agent execution from interrupt
#         input = Command(resume=user_input.message)
#     else:
#         input = {"messages": [HumanMessage(content=user_input.message)]}

#     kwargs = {
#         "input": input,
#         "config": config,
#     }

#     return kwargs, run_id


# @router.post("/{agent_id}/invoke")
# @router.post("/invoke")
# async def invoke(user_input: UserInput, agent_id: str = DEFAULT_AGENT) -> ChatMessage:
#     """
#     Invoke an agent with user input to retrieve a final response.

#     If agent_id is not provided, the default agent will be used.
#     Use thread_id to persist and continue a multi-turn conversation. run_id kwarg
#     is also attached to messages for recording feedback.
#     Use user_id to persist and continue a conversation across multiple threads.
#     """
#     # NOTE: Currently this only returns the last message or interrupt.
#     # In the case of an agent outputting multiple AIMessages (such as the background step
#     # in interrupt-agent, or a tool step in research-assistant), it's omitted. Arguably,
#     # you'd want to include it. You could update the API to return a list of ChatMessages
#     # in that case.
#     agent: AgentGraph = get_agent(agent_id)
#     kwargs, run_id = await _handle_input(user_input, agent)

#     try:
#         response_events: list[tuple[str, Any]] = await agent.ainvoke(**kwargs, stream_mode=["updates", "values"])  # type: ignore # fmt: skip
#         response_type, response = response_events[-1]
#         if response_type == "values":
#             # Normal response, the agent completed successfully
#             output = langchain_to_chat_message(response["messages"][-1])
#         elif response_type == "updates" and "__interrupt__" in response:
#             # The last thing to occur was an interrupt
#             # Return the value of the first interrupt as an AIMessage
#             output = langchain_to_chat_message(
#                 AIMessage(content=response["__interrupt__"][0].value)
#             )
#         else:
#             raise ValueError(f"Unexpected response type: {response_type}")

#         output.run_id = str(run_id)
#         return output
#     except Exception as e:
#         logger.error(f"An exception occurred: {e}")
#         raise HTTPException(status_code=500, detail="Unexpected error")




A concise summary of what happens in `_handle_input(...)` and the two routes `/invoke` and `/stream`:

---

### 🔹 `_handle_input(user_input, agent)`

* Generates a unique `run_id`, and fills in `thread_id` and `user_id` from input (or random UUIDs).
* Builds a `configurable` dict with `{thread_id, user_id, model}`, and merges `agent_config` if provided (ensuring no key conflicts).
* Optionally adds a **Langfuse callback** if tracing is enabled.
* Constructs a `RunnableConfig` with the above data.
* Checks agent’s state to see if it's **awaiting input (interrupted)**.

  * If yes → wraps user message in `Command(resume=...)`
  * If no → wraps it as a `HumanMessage(...)`
* Returns a tuple of `{input, config}` and `run_id`.

---

### 🔹 `POST /invoke`

* Gets the agent (by ID or default).
* Calls `_handle_input(...)` to prepare config + input.
* Runs the agent using `agent.ainvoke(...)` with `stream_mode=["updates", "values"]`.
* Gets the **last event** in the response:

  * If `"values"` → extracts last message and returns it.
  * If `"updates"` with `"__interrupt__"` → returns first interrupt as an AI message.
* Converts the result into a `ChatMessage`, attaches `run_id`, and returns it.

---

### 🔹 `POST /stream`

* Also calls `_handle_input(...)`.
* Streams agent output using `agent.astream(...)` (async generator).
* Handles:

  * `updates`: intermediate messages and interrupts
  * `messages`: token-by-token chunks (optional)
  * `custom`: metadata messages
* Converts stream events to `ChatMessage` or tokens, emits as **SSE (`data: {...}`)**.
* Skips echoed human input and tool-use chunks.
* Ends with `data: [DONE]`.

---


![image](images/handle_input_and_two_routes.png)


## 🔹 PART 1: `_handle_input(...)`

This function **prepares the input and config** for the agent.

```python
async def _handle_input(user_input: UserInput, agent: AgentGraph) -> tuple[dict[str, Any], UUID]:
```

---

### 🧠 What it does — at a high level:

* Generates unique IDs for tracking
* Builds the configuration for the LangChain agent (`RunnableConfig`)
* Sets up **Langfuse** tracing (optional)
* Handles **agent interruptions** (resuming an agent mid-task)
* Returns:

  * `kwargs` → to be passed into `agent.ainvoke(...)`
  * `run_id` → for tracking this request

---

### 🔍 Line-by-line breakdown

---

#### `run_id = uuid4()`

* A new unique run ID — used to identify this specific conversation turn
* Later attached to responses for feedback or tracing

---

#### `thread_id = user_input.thread_id or str(uuid4())`

* Reuses provided thread ID if available; otherwise creates a new one
* A thread represents one conversation (e.g. a chat session)

---

#### `user_id = user_input.user_id or str(uuid4())`

* Same idea: each user gets a unique identifier unless one is already provided

---

#### `configurable = {"thread_id": ..., "model": ..., "user_id": ...}`

* This dictionary will be used in `RunnableConfig`
* These keys are **reserved** and must not be overwritten by user-supplied `agent_config`

---

#### `callbacks = []`

* A list of LangChain callbacks (observers)
* Will include a Langfuse callback if tracing is enabled

---

#### Langfuse tracing (optional):

```python
if settings.LANGFUSE_TRACING:
    langfuse_handler = CallbackHandler()
    callbacks.append(langfuse_handler)
```

* Hooks into LangChain’s run system to log traces in Langfuse

---

#### `if user_input.agent_config:`

* Users can pass extra config to the agent (e.g., temperature, max\_tokens)
* First, it checks if any keys in `agent_config` **conflict with reserved keys**

```python
if overlap := configurable.keys() & user_input.agent_config.keys():
    raise HTTPException(...)
```

* If no conflicts, user config is merged in:

```python
configurable.update(user_input.agent_config)
```

---

#### `config = RunnableConfig(...)`

* Final config object passed to the LangChain agent
* Includes:

  * `configurable`: all thread/user/model + agent config
  * `run_id`: for tracing/feedback
  * `callbacks`: observers (Langfuse etc.)

---

#### Interrupt handling:

```python
state = await agent.aget_state(config=config)
interrupted_tasks = [task for task in state.tasks if hasattr(task, "interrupts") and task.interrupts]
```

* Checks if the agent has any **interrupted tasks** in its task state (e.g., awaiting a user response)
* If so, the user’s message is treated as a **resume signal**:

```python
input = Command(resume=user_input.message)
```

* Otherwise, it's treated as a **new HumanMessage**:

```python
input = {"messages": [HumanMessage(content=user_input.message)]}
```

---

#### Return value:

```python
return {"input": ..., "config": ...}, run_id
```

---

## 🔹 PART 2: `@router.post("/invoke")`

```python
async def invoke(user_input: UserInput, agent_id: str = DEFAULT_AGENT) -> ChatMessage:
```

---

### 🧠 What it does:

* Accepts a user input (message, thread ID, user ID, etc.)
* Uses `_handle_input(...)` to prepare for execution
* Invokes the selected agent via `agent.ainvoke(...)`
* Extracts the **final result or interrupt**
* Converts it into a `ChatMessage` object
* Returns it to the caller

---

### 🔍 Breakdown

---

#### `agent: AgentGraph = get_agent(agent_id)`

* Grabs the `AgentGraph` instance matching the provided ID
* This object handles the actual LangGraph execution

---

#### `kwargs, run_id = await _handle_input(...)`

* Prepares the configuration and formatted input
* Handles Langfuse, interrupt state, agent\_config, etc.

---

#### Main agent execution:

```python
response_events = await agent.ainvoke(..., stream_mode=["updates", "values"])
response_type, response = response_events[-1]
```

* `agent.ainvoke(...)` returns a list of events (a stream, even in non-streaming mode)
* The **last** event is assumed to contain the final result

---

#### Handling different response types:

##### ✅ Case 1: Agent completed successfully

```python
if response_type == "values":
    output = langchain_to_chat_message(response["messages"][-1])
```

* Converts the **last message** from LangChain into your custom `ChatMessage` format

---

##### 🟨 Case 2: Agent was interrupted

```python
elif response_type == "updates" and "__interrupt__" in response:
    output = langchain_to_chat_message(AIMessage(content=response["__interrupt__"][0].value))
```

* The agent paused mid-process and requested input from the user
* You take the first interrupt's content and return it as an `AIMessage`

---

##### ❌ Unexpected:

```python
else:
    raise ValueError(...)
```

---

#### Add run\_id to the message:

```python
output.run_id = str(run_id)
```

* This helps the frontend associate the output with the request, and enables **feedback logging** later

---

#### Final response

* Returns a `ChatMessage` (your app’s wrapper class around LangChain messages)
* If anything fails, logs the error and returns HTTP 500

---

## 🧩 Summary Flow

1. **Frontend sends**: message, model, user ID, thread ID, and agent config (optional)
2. **\_handle\_input()**:

   * Prepares identifiers
   * Adds config
   * Checks if this is a resume or new input
3. **ainvoke()** is called on the agent
4. **Last response is returned**:

   * If successful → last message
   * If interrupted → interrupt message
5. Output is wrapped with run\_id and returned as a `ChatMessage`

---

## ✅ Example Output

If the model finishes:

```json
{
  "type": "ai",
  "content": "Sure! Here's a summary...",
  "run_id": "70e4a9f7-...."
}
```

If interrupted:

```json
{
  "type": "ai",
  "content": "Can you clarify what you mean by 'this'?",
  "run_id": "70e4a9f7-...."
}
```

---

Let me know if you want to:

* Look into `agent.ainvoke(...)` and what it returns
* Unpack what `RunnableConfig` does behind the scenes
* Dive into `langchain_to_chat_message(...)`
* Or trace how interrupts are handled within the LangGraph agent

Where to next?


Great — let’s look at this **specific syntax** from a Python syntax point of view (not the functionality, since you already understand what it does).

Here’s the block again for context:

```python
if user_input.agent_config:
    if overlap := configurable.keys() & user_input.agent_config.keys():
        raise HTTPException(
            status_code=422,
            detail=f"agent_config contains reserved keys: {overlap}",
        )
    configurable.update(user_input.agent_config)
```

---

## 🔹 1. What does `:=` mean?

### It's called the **walrus operator** (introduced in Python 3.8)

#### ✅ Purpose:

It lets you **assign a value to a variable as part of an expression** — especially useful in `if` or `while` statements.

### 📌 Example:

```python
if (n := len(my_list)) > 10:
    print(f"List is too long! ({n} items)")
```

⬆️ Without the walrus operator, you’d have to do:

```python
n = len(my_list)
if n > 10:
    print(f"List is too long! ({n} items)")
```

### 🧠 So in your code:

```python
if overlap := configurable.keys() & user_input.agent_config.keys():
```

* `overlap` is **assigned** the result of the expression on the right
* If it's **truthy** (i.e., the set isn’t empty), the `if` block runs

✅ This is equivalent to:

```python
overlap = configurable.keys() & user_input.agent_config.keys()
if overlap:
    raise ...
```

---

## 🔹 2. What does `&` mean here?

### It's the **set intersection operator**

* `&` between two sets returns a new set with elements that are **common to both**

#### Example:

```python
set1 = {"a", "b", "c"}
set2 = {"b", "c", "d"}
overlap = set1 & set2  # => {"b", "c"}
```

🧠 In your code:

```python
configurable.keys() & user_input.agent_config.keys()
```

* Both `configurable.keys()` and `agent_config.keys()` return **dict views** that act like sets.
* This returns the set of **keys that appear in both**.

---

## ✅ Final simplified interpretation:

```python
if user_input.agent_config:
    overlap = set(configurable.keys()) & set(user_input.agent_config.keys())
    if overlap:
        raise HTTPException(...)
```

---

## 🔁 TL;DR

| Symbol | Meaning                                    | Usage in your code                                 |
| ------ | ------------------------------------------ | -------------------------------------------------- |
| `:=`   | Walrus operator (assign inside expression) | Assigns `overlap` while checking its value in `if` |
| `&`    | Set intersection                           | Finds conflicting keys in two dictionaries         |

Let me know if you want to practice writing some short examples using `:=` or `&`.


Great question — and you’re not alone. This part is a bit obscure, especially if you haven’t worked with `LangChainChatMessage(role="custom", ...)` or custom agents before.

Let’s break it down *clearly* and visually, starting with context, what `role == "custom"` means, and what’s being returned.

---

## 🔍 Context

This block is inside:

```python
def langchain_to_chat_message(message: BaseMessage) -> ChatMessage:
```

That function converts **LangChain message types** into your app’s unified `ChatMessage` format for API responses.

---

## 🔸 First, what is `LangchainChatMessage`?

LangChain defines a flexible message class:

```python
class ChatMessage(BaseMessage):
    role: str  # can be "system", "user", "assistant", "custom", etc.
    content: Union[str, list[dict[str, Any]]]
```

This lets you represent arbitrary messages like:

```python
ChatMessage(role="system", content="You are a helpful assistant.")
ChatMessage(role="user", content="What's the weather?")
ChatMessage(role="custom", content=[{"mydata": "abc"}])
```

It's **role-based**, not type-based like `AIMessage`, `HumanMessage`, etc.

---

## 🔸 Why `role == "custom"` is handled separately

LangChain allows developers to define **custom message roles** beyond the defaults (like `"system"`, `"user"`, `"assistant"`). If you're building agents or supervisors that carry their own metadata, you might emit a message like:

```python
LangchainChatMessage(role="custom", content=[{"agent_name": "researcher", "summary": "..."}])
```

This is usually **not meant to be shown as raw user/AI text**, but as metadata.

---

## 🧠 So what's this doing?

```python
if message.role == "custom":
    custom_message = ChatMessage(
        type="custom",
        content="",
        custom_data=message.content[0],  # first item in the content list
    )
    return custom_message
```

### Explanation:

* It **ignores the normal `content` field**, assuming it's **structured metadata**
* It pulls the **first item** (a `dict`) from the content list
* That dict is stored in `custom_data`, not `content`
* The `type` is set to `"custom"` so your frontend can treat it differently

---

## 💡 Is this a `text` message or a `chat` message?

Neither, directly.

This is a **custom metadata message**, **not intended as assistant/user dialogue**.

### So:

* ❌ It's **not** meant to be converted to plain text (`str`)
* ✅ It's wrapped as a `ChatMessage` with `type="custom"` and no readable `content`
* ✅ Its metadata is available in `custom_data` for the frontend or tools to process

---

## 🧪 What would a full example look like?

```python
# Raw LangChain message
LangchainChatMessage(
    role="custom",
    content=[{
        "agent_name": "supervisor",
        "task_id": "abc123",
        "summary": "Research phase complete."
    }]
)
```

Would be converted to:

```json
{
  "type": "custom",
  "content": "",
  "custom_data": {
    "agent_name": "supervisor",
    "task_id": "abc123",
    "summary": "Research phase complete."
  }
}
```

Your frontend could then say:

> “The supervisor agent finished a research task.”

---

## 🧩 Summary

| Feature                          | Value                                              |
| -------------------------------- | -------------------------------------------------- |
| Is it an AI/user/system message? | ❌ No                                               |
| Is it a text message?            | ❌ No                                               |
| What is it?                      | ✅ A special metadata message with custom structure |
| How is it returned?              | As a `ChatMessage` with `type="custom"`            |
| Where is the data?               | In `custom_data`, extracted from `content[0]`      |

---

## ✅ Key takeaways:

* `role="custom"` is a LangChain escape hatch for arbitrary metadata
* These messages are **not meant to be rendered like chat** — they're informational
* Your app converts them into `ChatMessage(type="custom")` with the first dict inside `custom_data`

---

Would you like me to show:

* Where in LangGraph/agents these custom messages may originate?
* Or how to safely generalize this to support multiple items, not just `content[0]`?

Let me know what you're curious about next.


## Thinking Mode with whole codebase access

Below is a line-by-line “exploded view” of the two helpers so you can see:

* **where every symbol comes from,**
* **why each step exists,** and
* **how they fit into LangGraph’s execution model.**

---

## 1. `_handle_input(...)`

| Step | Code                                                                                                     | Purpose / Notes                                                                                                                                                                              | Origin of types                                                   |          |
| ---- | -------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------- | -------- |
| 1    | `run_id = uuid4()`                                                                                       | Generates a unique ID for *this* call. The ID is later copied onto every outgoing `ChatMessage` so the front-end can send feedback to the right run.                                         | stdlib `uuid4()`                                                  |          |
| 2    | `thread_id = user_input.thread_id or str(uuid4())`<br>`user_id   = user_input.user_id   or str(uuid4())` | Guarantees a **thread scope** (conversation) and a **user scope** (cross-thread identity) even if the client doesn’t supply them.                                                            | `schema.UserInput`                                                |          |
| 3    | `configurable = {"thread_id": …, "model": user_input.model, "user_id": …}`                               | LangGraph lets you pass arbitrary **configurable** keys through `RunnableConfig`; they’re available to every node via `config.get("configurable")`. The toolkit standardises on these three. | LangGraph `RunnableConfig`                                        |          |
| 4    | *Langfuse tracing*                                                                                       | If `settings.LANGFUSE_TRACING` is on, a `CallbackHandler` from **Langfuse** is appended so every node is traced automatically.                                                               | `langfuse.CallbackHandler`                                        |          |
| 5    | *Merge* `user_input.agent_config`                                                                        | The REST API lets callers add extra model parameters (e.g. `{"temperature":0.3}`). Collision with the reserved keys above triggers `HTTP 422`.                                               | user supplied                                                     |          |
| 6    | `config = RunnableConfig( … )`                                                                           | Bundles<br>• `run_id` (needed so feedback goes to the right trace)<br>• `callbacks` (Langfuse list)<br>• `configurable` (dict)                                                               | LangGraph core                                                    |          |
| 7    | `state = await agent.aget_state(config=config)`                                                          | Pulls the *latest saved* state for this thread from the **checkpointer** that was injected in `lifespan()`.                                                                                  | \`AgentGraph = CompiledStateGraph                                 | Pregel\` |
| 8    | `interrupted_tasks = [t for t in state.tasks if t.interrupts]`                                           | Looks for any node that raised `Interrupt`. That’s how LangGraph signals “pause and wait for the human”.                                                                                     | LangGraph `Interrupt`                                             |          |
| 9    | `input = Command(resume=...)` **or** `{"messages":[HumanMessage(...) ]}`                                 | • If we’re resuming, wrap the human’s reply in `Command(resume=…)`.<br>• Otherwise provide a normal list of `HumanMessage`s.                                                                 | `langgraph.types.Command`, `langchain_core.messages.HumanMessage` |          |
| 10   | `return {"input": input, "config": config}, run_id`                                                      | These kwargs are exactly what `agent.ainvoke()` / `agent.astream()` expect.                                                                                                                  | —                                                                 |          |

---

### Why the `Command(resume=…)` dance?

LangGraph v0.3 introduced `interrupt()`—a node can `raise Interrupt(…)` and the graph pauses. To *resume* you call the graph again with `Command(resume=<human-reply>)`; LangGraph rewinds to the interrupted node and continues. `_handle_input` detects that pattern so the REST layer can stay totally stateless.

---

## 2. `invoke(...)`

```python
agent: AgentGraph = get_agent(agent_id)
kwargs, run_id   = await _handle_input(...)
response_events  = await agent.ainvoke(**kwargs, stream_mode=["updates", "values"])
```

| Phase                   | Detail                                                                                                                                                                                                              |
| ----------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Dispatch**            | `get_agent()` returns the compiled graph (or a Pregel function) registered under that name.                                                                                                                         |
| **Execution**           | We call `.ainvoke()` **once** and ask LangGraph to give us two stream channels:<br>• `"updates"` – intermediate node payloads (used for interrupt detection)<br>• `"values"`  – the final return value of the graph |
| **Pick the last event** | LangGraph yields a list like `[( "updates", {...} ), ( "values", {"messages":[...]} )]`. We only care about the tail.                                                                                               |
| **Normal completion**   | If the tail is `"values"` → convert the last message (`AIMessage`) to your public `ChatMessage` DTO.                                                                                                                |
| **Interrupted**         | If the tail is `"updates"` and the dict contains `"__interrupt__"` → take the *first* interrupt object, wrap it in an `AIMessage`.                                                                                  |
| **Attach run\_id**      | `output.run_id = str(run_id)` so any `/feedback` call can map stars → run.                                                                                                                                          |
| **Return**              | A single `ChatMessage` (type `"ai"`) is sent back.                                                                                                                                                                  |

> **⚠️ Note**
> If a graph emits *several* AI messages before finishing (e.g., tool-call patterns), only the final one is returned. The comment in the code points this out—you could change the REST contract to return a list if you need the full chain.

---

## External pieces you might want to open next

| Symbol                        | Defined in             | Why it matters                                                                       |
| ----------------------------- | ---------------------- | ------------------------------------------------------------------------------------ |
| `UserInput`, `ChatMessage`    | `src/schema/schema.py` | Pydantic models for request/response.                                                |
| `AgentGraph`, `get_agent()`   | `src/agents/agents.py` | Registry + type alias for compiled LangGraph graphs.                                 |
| `RunnableConfig`              | `langgraph.runnables`  | Carries `run_id`, callbacks, and your custom `configurable` dict through every node. |
| `Command`, `Interrupt`        | `langgraph.types`      | Mechanism for pause/resume.                                                          |
| `langchain_to_chat_message()` | `src/service/utils.py` | Converts LangChain message objects → your façade model.                              |

Just let me know which of those (or any other) you’d like to dive into—whether that’s viewing the raw source, walking through control-flow, or discussing how to extend/modify it.


## What Is a LangGraph Supervisor?

**LangGraph Supervisor** is a simple Python library created by the LangChain team to help developers build **hierarchical multi-agent systems**—that is, systems where a **central "supervisor" agent** coordinates several **specialist "worker" agents** ([LangChain Changelog][1]).

### How It Works — Explained Simply:

* **Supervisor Agent (like a manager):** Acts as the brain of the system. It reads what the user wants and decides which worker agent should do the job—whether that’s researching information, doing math, or something else.
* **Worker Agents (like specialists):** Each one focuses on a specific task. For example, one might be a math expert, another a research assistant. They do the work and then report back to the supervisor ([GitHub][2]).

### Why It’s Useful:

* **Keeps things organized.** The supervisor controls which agent works when and keeps all communication in order.
* **Flexible structure.** You can build multiple levels of supervisors—one supervisor can manage others—creating complex, layered workflows ([GitHub][2], [LangChain Changelog][1]).
* **Efficient memory handling.** You can choose whether to store the full conversation history or just the last message from each agent ([GitHub][2]).

---

### Real-World Analogy:

Imagine a **restaurant**:

* The **head chef** (supervisor) delegates tasks—“Sous-chef, prep the vegetables”; “Grill-chef, cook the steak”.
* Each cook (worker agent) does their part and then lets the head chef know they’re done.
* The head chef then plates the dish and serves the customer.

That’s exactly how a LangGraph Supervisor works in an AI system. It delegates, coordinates, and manages communication so that tasks get done in an orderly way.

---

### In Summary:

* **Supervisor = Manager**: Decides who does what.
* **Workers = Specialists**: Do the actual tasks.
* **Hierarchy possible**: Supervisors can manage other supervisors.
* **Flexible memory**: You choose how much of the conversation to keep.

---

https://changelog.langchain.com/announcements/langgraph-supervisor-a-library-for-hierarchical-multi-agent-systems?utm_source=chatgpt.com 

"LangGraph Supervisor: A Library for"

https://github.com/langchain-ai/langgraph-supervisor-py?utm_source=chatgpt.com "langchain-ai/langgraph-supervisor-py"


## some prompts


"""
current file's folder:
C:\Users\Ans\Desktop\code\16_agent_services_toolkit\agent-service-toolkit\src\service

it's contents:
.ipynb_checkpoints
explanantions.ipynb
service.py
utils.py
__init__.py

ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo

parent folder address:
C:\Users\Ans\Desktop\code\16_agent_services_toolkit\agent-service-toolkit\src
contents:
agents
client
core
memory
run_agent.py
run_client.py
run_service.py
schema
service
streamlit_app.py

ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo

memmory folder path:
C:\Users\Ans\Desktop\code\16_agent_services_toolkit\agent-service-toolkit\src\memory
contents:
mongodb.py
postgres.py
sqlite.py
__init__.py

ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo

C:\Users\Ans\Desktop\code\16_agent_services_toolkit\agent-service-toolkit\src\memory\__init__.py
contents:
from contextlib import AbstractAsyncContextManager

from langgraph.checkpoint.mongodb.aio import AsyncMongoDBSaver
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

from core.settings import DatabaseType, settings
from memory.mongodb import get_mongo_saver
from memory.postgres import get_postgres_saver, get_postgres_store
from memory.sqlite import get_sqlite_saver, get_sqlite_store


def initialize_database() -> AbstractAsyncContextManager[
    AsyncSqliteSaver | AsyncPostgresSaver | AsyncMongoDBSaver
]:
    """
    Initialize the appropriate database checkpointer based on configuration.
    Returns an initialized AsyncCheckpointer instance.
    """
    if settings.DATABASE_TYPE == DatabaseType.POSTGRES:
        return get_postgres_saver()
    if settings.DATABASE_TYPE == DatabaseType.MONGO:
        return get_mongo_saver()
    else:  # Default to SQLite
        return get_sqlite_saver()


def initialize_store():
    """
    Initialize the appropriate store based on configuration.
    Returns an async context manager for the initialized store.
    """
    if settings.DATABASE_TYPE == DatabaseType.POSTGRES:
        return get_postgres_store()
    # TODO: Add Mongo store - https://pypi.org/project/langgraph-store-mongodb/
    else:  # Default to SQLite
        return get_sqlite_store()


__all__ = ["initialize_database", "initialize_store"]

ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo

how is it working? memory module was supposed to be in the same folder as the service.py file?
"""

# authentication

In [12]:
import jwt
import base64

from sqlmodel import SQLModel

class TokenPayload(SQLModel):
    sub: str | None = None


# Access API keys and credentials
AUTH_SECRET : str
ALGORITHM   : str


# will later get them from core.settings and will put in core.settings from .env
#AUTH_SECRET = 'abc123'
#AUTH_SECRET = 'Ma9m1zmnlDcHY0XBgozXR5g4bP16mcYRnOgHXjzsLMw='
ALGORITHM   = 'HS256'

# same Base64 value you used in Node
AUTH_SECRET = "Ma9m1zmnlDcHY0XBgozXR5g4bP16mcYRnOgHXjzsLMw="

# 1) convert Base64 -> raw bytes
AUTH_SECRET_BYTES = base64.b64decode(SECRET_B64.strip())



#token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NTcxNzU0NjIsInN1YiI6ImhlbGxvIGJpdGNoZXMhIn0.s4b6hqYPwAH89lFc4Ns2YXhrNPy4xJ4M2HjAsGJ58f4'
token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJyb2xlIjoidXNlciIsImlhdCI6MTc1NzE2NTM0MSwiaXNzIjoiaHR0cHM6Ly9leGFtcGxlLmNvbSIsImF1ZCI6Im15LWF1ZGllbmNlIiwic3ViIjoiMTIzNDU2Nzg5MCIsImV4cCI6MTc1NzQyNDU0MX0.WnwIsuMeF2Sf4t3ZbP_yzR6ZeeiSrwV6XwlgnQOGZgM'
# Validate a token created by the external issuer (shared secret HS256 in this demo).

print('1')

payload    = jwt.decode(token,
                        AUTH_SECRET_BYTES,
                        algorithms=[ALGORITHM],        #) # settings.AUTH_SECRET.get_secret_value()
                        issuer="https://example.com",
                        audience="my-audience",
)
                      
 
print('2')
token_data = TokenPayload(**payload)
print('3')
print(token_data.model_dump())


1
2
3
{'sub': '1234567890'}
