Skip to content
This repository was archived by the owner on Nov 8, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
include requirements.txt
include README.md
include requirements-test.txt
include VERSION.txt
include LICENSE
1 change: 0 additions & 1 deletion VERSION.txt

This file was deleted.

71 changes: 71 additions & 0 deletions eppo_client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import Optional
from eppo_client.client import EppoClient
from eppo_client.config import Config
from eppo_client.configuration_requestor import (
ExperimentConfigurationDto,
ExperimentConfigurationRequestor,
)
from eppo_client.configuration_store import ConfigurationStore
from eppo_client.constants import MAX_CACHE_ENTRIES
from eppo_client.http_client import HttpClient, SdkParams
from eppo_client.read_write_lock import ReadWriteLock

__version__ = "0.0.1"

__client: Optional[EppoClient] = None
__lock = ReadWriteLock()


def init(config: Config) -> EppoClient:
"""Initializes a global Eppo client instance

This method should be called once on application startup.
If invoked more than once, it will re-initialize the global client instance.
Use the :func:`eppo_client.get_instance()` method to access the client instance.

:param config: client configuration containing the API Key
:type config: Config
"""
config._validate()
sdk_params = SdkParams(
apiKey=config.api_key, sdkName="python", sdkVersion=__version__
)
http_client = HttpClient(base_url=config.base_url, sdk_params=sdk_params)
config_store: ConfigurationStore[ExperimentConfigurationDto] = ConfigurationStore(
max_size=MAX_CACHE_ENTRIES
)
config_requestor = ExperimentConfigurationRequestor(
http_client=http_client, config_store=config_store
)
global __client
global __lock
try:
__lock.acquire_write()
if __client:
# if a client was already initialized, stop the background processes of the old client
__client._shutdown()
__client = EppoClient(config_requestor=config_requestor)
return __client
finally:
__lock.release_write()


def get_instance() -> EppoClient:
"""Used to access an initialized client instance

Use this method to get a client instance for assigning variants.
This method may only be called after invocation of :func:`eppo_client.init()`, otherwise it throws an exception.

:return: a shared client instance
:rtype: EppoClient
"""
global __client
global __lock
try:
__lock.acquire_read()
if __client:
return __client
else:
raise Exception("init() must be called before get_instance()")
finally:
__lock.release_read()
20 changes: 17 additions & 3 deletions eppo_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@
ExperimentConfigurationDto,
ExperimentConfigurationRequestor,
)
from eppo_client.constants import POLL_INTERVAL_MILLIS, POLL_JITTER_MILLIS
from eppo_client.poller import Poller
from eppo_client.shard import get_shard, is_in_shard_range
from eppo_client.validation import validate_not_blank


class EppoClient:
def __init__(self, config_requestor: ExperimentConfigurationRequestor) -> None:
def __init__(self, config_requestor: ExperimentConfigurationRequestor):
self.__config_requestor = config_requestor
self.__poller = Poller(
interval_millis=POLL_INTERVAL_MILLIS,
jitter_millis=POLL_JITTER_MILLIS,
callback=config_requestor.fetch_and_store_configurations,
)
self.__poller.start()

def assign(self, subject: str, experiment_key: str) -> Optional[str]:
"""Maps a subject to a variation for a given experiment
Expand All @@ -24,7 +32,7 @@ def assign(self, subject: str, experiment_key: str) -> Optional[str]:
if (
experiment_config is None
or not experiment_config.enabled
or not self.is_in_experiment_sample(
or not self._is_in_experiment_sample(
subject, experiment_key, experiment_config
)
):
Expand All @@ -42,7 +50,13 @@ def assign(self, subject: str, experiment_key: str) -> Optional[str]:
None,
)

def is_in_experiment_sample(
def _shutdown(self):
"""Stops all background processes used by the client
Do not use the client after calling this method.
"""
self.__poller.stop()

def _is_in_experiment_sample(
self,
subject: str,
experiment_key: str,
Expand Down
41 changes: 37 additions & 4 deletions eppo_client/configuration_requestor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from typing import List, Optional
import logging
from typing import Dict, List, Optional, cast
from eppo_client.base_model import SdkBaseModel
from eppo_client.configuration_store import ConfigurationStore
from eppo_client.http_client import HttpClient, HttpRequestError

from eppo_client.shard import ShardRange

logger = logging.getLogger(__name__)


class VariationDto(SdkBaseModel):
name: str
Expand All @@ -14,12 +19,40 @@ class ExperimentConfigurationDto(SdkBaseModel):
percent_exposure: float
enabled: bool
variations: List[VariationDto]
name: str
name: Optional[str]


RAC_ENDPOINT = "/randomized_assignment/config"


class ExperimentConfigurationRequestor:
def __init__(
self,
http_client: HttpClient,
config_store: ConfigurationStore[ExperimentConfigurationDto],
):
self.__http_client = http_client
self.__config_store = config_store

def get_configuration(
self, experiment_key: str
) -> Optional[ExperimentConfigurationDto]:
# TODO: implement this method
return None
if self.__http_client.is_unauthorized():
raise ValueError("Unauthorized: please check your API key")
return self.__config_store.get_configuration(experiment_key)

def fetch_and_store_configurations(self) -> Dict[str, ExperimentConfigurationDto]:
try:
configs = cast(
dict, self.__http_client.get(RAC_ENDPOINT).get("experiments", {})
)
for exp_key, exp_config in configs.items():
configs[exp_key] = ExperimentConfigurationDto(**exp_config)
self.__config_store.set_configurations(configs)
return configs
except HttpRequestError as e:
logger.error("Error retrieving assignment configurations: " + str(e))
if e.is_recoverable():
return {}
else:
raise e # caught by the polling task; causes assignment polling to stop
29 changes: 29 additions & 0 deletions eppo_client/configuration_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import Dict, Optional, TypeVar, Generic
from cachetools import LRUCache

from eppo_client.read_write_lock import ReadWriteLock

T = TypeVar("T")


class ConfigurationStore(Generic[T]):
def __init__(self, max_size: int):
self.__cache: LRUCache = LRUCache(maxsize=max_size)
self.__lock = ReadWriteLock()

def get_configuration(self, key: str) -> Optional[T]:
try:
self.__lock.acquire_read()
return self.__cache[key]
except KeyError:
return None # key does not exist
finally:
self.__lock.release_read()

def set_configurations(self, configs: Dict[str, T]):
try:
self.__lock.acquire_write()
for key, config in configs.items():
self.__cache[key] = config
finally:
self.__lock.release_write()
8 changes: 8 additions & 0 deletions eppo_client/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# configuration cache
MAX_CACHE_ENTRIES = 1000 # arbitrary; the caching library requires a max limit

# poller
SECOND_MILLIS = 1000
MINUTE_MILLIS = 60 * SECOND_MILLIS
POLL_JITTER_MILLIS = 30 * SECOND_MILLIS
POLL_INTERVAL_MILLIS = 5 * MINUTE_MILLIS
68 changes: 68 additions & 0 deletions eppo_client/http_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from typing import Any
from requests.exceptions import Timeout
from requests.adapters import HTTPAdapter, Retry
from http import HTTPStatus

import requests

from eppo_client.base_model import SdkBaseModel


class SdkParams(SdkBaseModel):
# attributes are camelCase because that's what the backend endpoint expects
apiKey: str
sdkName: str
sdkVersion: str


class HttpRequestError(Exception):
def __init__(self, message: str, status_code: int):
self.status_code = status_code
super().__init__(message)

def is_recoverable(self) -> bool:
if self.status_code >= 400 and self.status_code < 500:
return (
self.status_code == HTTPStatus.TOO_MANY_REQUESTS
or self.status_code == HTTPStatus.REQUEST_TIMEOUT
)
return True


REQUEST_TIMEOUT_SECONDS = 2
# Retry reference: https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#module-urllib3.util.retry
# This applies only to failed DNS lookups and connection timeouts,
# never to requests where data has made it to the server.
MAX_RETRIES = Retry(total=3, backoff_factor=1)


class HttpClient:
def __init__(self, base_url: str, sdk_params: SdkParams):
self.__base_url = base_url
self.__sdk_params = sdk_params
self.__session = requests.Session()
self.__session.mount("https://", HTTPAdapter(max_retries=MAX_RETRIES))
self.__is_unauthorized = False

def is_unauthorized(self) -> bool:
return self.__is_unauthorized

def get(self, resource: str) -> Any:
try:
response = self.__session.get(
self.__base_url + resource,
params=self.__sdk_params.dict(),
timeout=REQUEST_TIMEOUT_SECONDS,
)
self.__is_unauthorized = response.status_code == HTTPStatus.UNAUTHORIZED
if response.status_code != HTTPStatus.OK:
raise self._get_http_error(response.status_code, resource)
return response.json()
except Timeout:
raise self._get_http_error(HTTPStatus.REQUEST_TIMEOUT, resource)

def _get_http_error(self, status_code: int, resource: str) -> HttpRequestError:
return HttpRequestError(
"HTTP {} error while requesting resource {}".format(status_code, resource),
status_code=status_code,
)
38 changes: 38 additions & 0 deletions eppo_client/poller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import logging
from multiprocessing import Event
from random import randrange
from threading import Thread
from typing import Callable

logger = logging.getLogger(__name__)


class Poller:
def __init__(self, interval_millis: int, jitter_millis: int, callback: Callable):
self.__jitter_millis = jitter_millis
self.__interval = interval_millis
self.__stop_event = Event()
self.__callback = callback
self.__thread = Thread(target=self.poll, daemon=True)

def start(self):
self.__thread.start()

def stop(self):
self.__stop_event.set()

def is_stopped(self):
return self.__stop_event.is_set()

def poll(self):
while not self.is_stopped():
try:
self.__callback()
except Exception as e:
logger.error("Unexpected error running poll task: " + str(e))
break
self._wait_for_interval()

def _wait_for_interval(self):
interval_with_jitter = self.__interval - randrange(0, self.__jitter_millis)
self.__stop_event.wait(interval_with_jitter / 1000)
42 changes: 42 additions & 0 deletions eppo_client/read_write_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import threading

# Copied from: https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s04.html


class ReadWriteLock:
"""A lock object that allows many simultaneous "read locks", but
only one "write lock." """

def __init__(self):
self._read_ready = threading.Condition(threading.Lock())
self._readers = 0

def acquire_read(self):
"""Acquire a read lock. Blocks only if a thread has
acquired the write lock."""
self._read_ready.acquire()
try:
self._readers += 1
finally:
self._read_ready.release()

def release_read(self):
"""Release a read lock."""
self._read_ready.acquire()
try:
self._readers -= 1
if not self._readers:
self._read_ready.notifyAll()
finally:
self._read_ready.release()

def acquire_write(self):
"""Acquire a write lock. Blocks until there are no
acquired read or write locks."""
self._read_ready.acquire()
while self._readers > 0:
self._read_ready.wait()

def release_write(self):
"""Release a write lock."""
self._read_ready.release()
3 changes: 2 additions & 1 deletion requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
tox
pytest
mypy
google-cloud-storage
google-cloud-storage
httpretty
6 changes: 5 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
pydantic
pydantic
requests
cachetools
types-cachetools
types-requests
Loading