Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH]: Retry Strategy for HttpClient #2183

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions chromadb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
"QueryResult",
"GetResult",
"TokenTransportHeader",
"RetryStrategy",
]

from chromadb.utils.net import RetryStrategy

logger = logging.getLogger(__name__)

__settings = Settings()
Expand Down Expand Up @@ -160,6 +163,7 @@ def HttpClient(
settings: Optional[Settings] = None,
tenant: str = DEFAULT_TENANT,
database: str = DEFAULT_DATABASE,
retry: Optional[RetryStrategy] = None,
) -> ClientAPI:
"""
Creates a client that connects to a remote Chroma server. This supports
Expand All @@ -174,6 +178,7 @@ def HttpClient(
settings: A dictionary of settings to communicate with the chroma server.
tenant: The tenant to use for this client. Defaults to the default tenant.
database: The database to use for this client. Defaults to the default database.
retry: The retry strategy to use for this client. Defaults to None.
"""

if settings is None:
Expand All @@ -185,6 +190,7 @@ def HttpClient(
ssl = bool(ssl)
tenant = str(tenant)
database = str(database)
settings.chroma_client_retry_strategy = retry

settings.chroma_api_impl = "chromadb.api.fastapi.FastAPI"
if settings.chroma_server_host and settings.chroma_server_host != host:
Expand All @@ -208,6 +214,7 @@ def CloudClient(
database: str,
api_key: Optional[str] = None,
settings: Optional[Settings] = None,
retry: Optional[RetryStrategy] = None,
*, # Following arguments are keyword-only, intended for testing only.
cloud_host: str = "api.trychroma.com",
cloud_port: int = 8000,
Expand All @@ -220,6 +227,8 @@ def CloudClient(
tenant: The tenant to use for this client.
database: The database to use for this client.
api_key: The api key to use for this client.
settings: A dictionary of settings to communicate with the chroma server.
retry: The retry strategy to use for this client.
"""

# If no API key is provided, try to load it from the environment variable
Expand All @@ -238,6 +247,10 @@ def CloudClient(
if settings is None:
settings = Settings()

settings.chroma_client_retry_strategy = (
retry # for hosted: default to a cloud-specific retry strategy
)

# Make sure paramaters are the correct types -- users can pass anything.
tenant = str(tenant)
database = str(database)
Expand Down
12 changes: 11 additions & 1 deletion chromadb/api/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from uuid import UUID
import requests
from overrides import override
from requests.adapters import HTTPAdapter

import chromadb.errors as errors
from chromadb.types import Database, Tenant
Expand Down Expand Up @@ -42,6 +43,8 @@
from urllib.parse import urlparse, urlunparse, quote
from chromadb.types import Collection as CollectionModel

from chromadb.utils.net import RetryStrategy

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -112,8 +115,15 @@ def __init__(self, system: System):
default_api_path=system.settings.chroma_server_api_default_path,
)

self._session = requests.Session()
if system.settings.chroma_client_retry_strategy:
self._retry = system.settings.chroma_client_retry_strategy.to_retry()
else:
self._retry = RetryStrategy().to_retry()

adapter = HTTPAdapter(max_retries=self._retry)
self._session = requests.Session()
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
self._header = system.settings.chroma_server_headers
if self._header is not None:
self._session.headers.update(self._header)
Expand Down
3 changes: 3 additions & 0 deletions chromadb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing_extensions import Literal
import platform

from chromadb.utils.net import RetryStrategy

in_pydantic_v2 = False
try:
Expand Down Expand Up @@ -126,6 +127,8 @@ def empty_str_to_none(cls, v: str) -> Optional[str]:
# eg ["http://localhost:3000"]
chroma_server_cors_allow_origins: List[str] = []

chroma_client_retry_strategy: Optional[RetryStrategy] = None

# ==================
# Server config
# ==================
Expand Down
203 changes: 202 additions & 1 deletion chromadb/test/test_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from typing import Generator
import json
import socket
import time
import uuid
from typing import Generator, Any, Optional
from unittest.mock import patch

from pytest_httpserver import HTTPServer

import chromadb
from chromadb.config import Settings
from chromadb.api import ClientAPI
import chromadb.server.fastapi
import pytest
import tempfile
from chromadb.utils.net import RetryStrategy


@pytest.fixture
Expand Down Expand Up @@ -70,3 +78,196 @@ def test_http_client_with_inconsistent_port_settings() -> None:
str(e)
== "Chroma server http port provided in settings[8001] is different to the one provided in HttpClient: [8002]"
)


@pytest.fixture
def retry_session(httpserver: HTTPServer) -> chromadb.api.ClientAPI:
httpserver.expect_request("/api/v1/tenants/default_tenant").respond_with_data(
json.dumps({"name": "default_tenant"})
)
httpserver.expect_request(
"/api/v1/databases/default_database",
query_string="tenant=default_tenant",
).respond_with_data(
json.dumps(
{
"id": f"{uuid.uuid4()}",
"name": "default_database",
"tenant": "default_tenant",
}
)
)
return chromadb.HttpClient(host=httpserver.host, port=httpserver.port)


@pytest.fixture
def retry_session_with_custom_retry(httpserver: HTTPServer) -> chromadb.api.ClientAPI:
httpserver.expect_request("/api/v1/tenants/default_tenant").respond_with_data(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

Suggested change
def retry_session_with_custom_retry(httpserver: HTTPServer) -> chromadb.api.ClientAPI:
def retry_session_on_504(httpserver: HTTPServer) -> chromadb.api.ClientAPI:

json.dumps({"name": "default_tenant"})
)
httpserver.expect_request(
"/api/v1/databases/default_database",
query_string="tenant=default_tenant",
).respond_with_data(
json.dumps(
{
"id": f"{uuid.uuid4()}",
"name": "default_database",
"tenant": "default_tenant",
}
)
)
return chromadb.HttpClient(
host=httpserver.host,
port=httpserver.port,
retry=RetryStrategy(total=2, status_codes=(504,)),
)


def test_retry_on_429(
httpserver: HTTPServer, retry_session: chromadb.api.ClientAPI
) -> None:
retry_after_header = {"Retry-After": "2"} # wait for 2 sec before trying again

httpserver.expect_oneshot_request("/api/v1").respond_with_data(
"Too Many Requests", status=429, headers=retry_after_header
)
httpserver.expect_oneshot_request("/api/v1").respond_with_data(
"Too Many Requests", status=429, headers=retry_after_header
)
httpserver.expect_oneshot_request("/api/v1").respond_with_data(
json.dumps({"nanosecond heartbeat": 1715365335819568533}), status=200
)
start_time = time.time()
retry_session.heartbeat()
assert (
time.time() - start_time > 2 * 2
) # ensure that we respect the Retry-After header


def test_retry_on_504(
httpserver: HTTPServer, retry_session: chromadb.api.ClientAPI
) -> None:
httpserver.expect_oneshot_request("/api/v1").respond_with_data(
"Gateway Timeout", status=504
)

httpserver.expect_oneshot_request("/api/v1").respond_with_data(
"Gateway Timeout", status=504
)
httpserver.expect_oneshot_request("/api/v1").respond_with_data(
json.dumps({"nanosecond heartbeat": 1715365335819568533}), status=200
)
retry_session.heartbeat()


def test_retry_on_503(
httpserver: HTTPServer, retry_session: chromadb.api.ClientAPI
) -> None:
httpserver.expect_oneshot_request("/api/v1").respond_with_data(
"Service Unavailable", status=503
)

httpserver.expect_oneshot_request("/api/v1").respond_with_data(
"Service Unavailable", status=503
)
httpserver.expect_oneshot_request("/api/v1").respond_with_data(
json.dumps({"nanosecond heartbeat": 1715365335819568533}), status=200
)
retry_session.heartbeat()


def test_retry_on_503_exceeding_max(
httpserver: HTTPServer, retry_session: chromadb.api.ClientAPI
) -> None:
httpserver.expect_request("/api/v1").respond_with_data(
"Service Unavailable", status=503
)

with pytest.raises(Exception, match="Max retries exceeded with url"):
retry_session.heartbeat()


def test_no_retry_on_400(
httpserver: HTTPServer, retry_session: chromadb.api.ClientAPI
) -> None:
httpserver.expect_request("/api/v1").respond_with_data("Bad Request", status=400)

with pytest.raises(Exception, match="Bad Request"):
retry_session.heartbeat()


def test_no_retry_on_400_with_custom_retry(
httpserver: HTTPServer, retry_session_with_custom_retry: chromadb.api.ClientAPI
) -> None:
httpserver.expect_request("/api/v1").respond_with_data("Bad Request", status=400)

with pytest.raises(Exception, match="Bad Request"):
retry_session_with_custom_retry.heartbeat()


def test_no_retry_on_429_with_custom_retry(
httpserver: HTTPServer, retry_session_with_custom_retry: chromadb.api.ClientAPI
) -> None:
httpserver.expect_request("/api/v1").respond_with_data(
"Too Many Requests", status=429
)

with pytest.raises(Exception, match="Too Many Requests"):
retry_session_with_custom_retry.heartbeat()


def test_no_retry_on_504_with_custom_retry(
httpserver: HTTPServer, retry_session_with_custom_retry: chromadb.api.ClientAPI
) -> None:
httpserver.expect_oneshot_request("/api/v1").respond_with_data(
"Gateway Timeout", status=504
)
httpserver.expect_oneshot_request("/api/v1").respond_with_data(
"Gateway Timeout", status=504
)
httpserver.expect_oneshot_request("/api/v1").respond_with_data(
json.dumps({"nanosecond heartbeat": 1715365335819568533}), status=200
)
retry_session_with_custom_retry.heartbeat()


def server(max_retries: int = 1) -> None:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("localhost", 9999))
s.listen(1)
retries = 0
while retries < max_retries:
conn, _ = s.accept()
conn.close() # Close the connection immediately
retries += 1


@pytest.fixture
def connect_retries() -> int:
return 3


@pytest.fixture
def local_tcp_server(connect_retries: int) -> Generator[None, None, None]:
import threading

t = threading.Thread(target=server, args=(connect_retries,))
t.start()
yield
t.join()


def test_with_connection_error(
local_tcp_server: Optional[Any], connect_retries: int
) -> None:
start_time = time.time()
with pytest.raises(Exception):
chromadb.HttpClient(
host="localhost",
port=9999,
retry=RetryStrategy(connect=connect_retries - 1, backoff_factor=1),
)
assert (
time.time() - start_time > 4
) # ensure that we have attempted to connect twice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, not super clear to me, does this work because .heartbeat() is called when initing the client?

Loading
Loading