diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5ac3aed..56d811b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,7 +15,7 @@ repos: rev: 23.1.0 hooks: - id: black - language_version: python3.11 + language_version: python3.8 args: [--line-length=120, --skip-string-normalization] - repo: https://github.com/pycqa/flake8 @@ -24,7 +24,7 @@ repos: - id: flake8 - repo: https://github.com/pycqa/isort - rev: 5.10.1 + rev: 5.11.5 hooks: - id: isort stages: [commit] diff --git a/CHANGES.md b/CHANGES.md index f486228..951d64f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changelog +## 1.4.0 +* Refactored ``async_customerio.client_base.AsyncClientBase`` to take advantage of connection pool. So the HTTP client will be created once during class ``async_customerio.track.AsyncCustomerIO`` and ``async_customerio.api.AsyncAPIClient`` instantiation. + + ## 1.3.0 - Support of Python 3.12 added. diff --git a/async_customerio/_config.py b/async_customerio/_config.py new file mode 100644 index 0000000..25b5aca --- /dev/null +++ b/async_customerio/_config.py @@ -0,0 +1,39 @@ +import typing as t +from dataclasses import dataclass + + +@dataclass +class RequestTimeout: + """ + Request timeout configuration. + + Arguments: + timeout: Timeout on all operations eg, read, write, connect. + + Examples: + RequestTimeout(None) # No timeouts. + RequestTimeout(5.0) # 5s timeout on all operations. + """ + + timeout: t.Optional[float] = None + + +@dataclass +class RequestLimits: + """ + Configuration for request limits. + + Attributes: + max_connections: The maximum number of concurrent connections that may be established. + max_keepalive_connections: Allow the connection pool to maintain keep-alive connections + below this point. Should be less than or equal to `max_connections`. + keepalive_expiry: Time limit on idle keep-alive connections in seconds. + """ + + max_connections: t.Optional[int] = None + max_keepalive_connections: t.Optional[int] = None + keepalive_expiry: t.Optional[int] = None + + +DEFAULT_REQUEST_TIMEOUT = RequestTimeout(timeout=10.0) +DEFAULT_REQUEST_LIMITS = RequestLimits(max_connections=100, max_keepalive_connections=20, keepalive_expiry=5) diff --git a/async_customerio/api.py b/async_customerio/api.py index 1b0b406..8f77805 100644 --- a/async_customerio/api.py +++ b/async_customerio/api.py @@ -3,6 +3,8 @@ """ import base64 +from async_customerio._config import DEFAULT_REQUEST_TIMEOUT, RequestTimeout + try: from typing import Dict, Literal, Optional, TypedDict, Union @@ -210,14 +212,19 @@ class AsyncAPIClient(AsyncClientBase): SEND_PUSH_NOTIFICATION_ENDPOINT = "/send/push" def __init__( - self, key: str, url: Optional[str] = None, region: Region = Regions.US, retries: int = 3, timeout: int = 10 + self, + key: str, + url: Optional[str] = None, + region: Region = Regions.US, + retries: int = 3, + request_timeout: RequestTimeout = DEFAULT_REQUEST_TIMEOUT, ): if not isinstance(region, Region): raise AsyncCustomerIOError("invalid region provided") self.key = key self.base_url = url or "https://{host}".format(host=region.api_host) - super().__init__(retries=retries, timeout=timeout) + super().__init__(retries=retries, request_timeout=request_timeout) async def send_email(self, request: SendEmailRequest) -> dict: if not isinstance(request, SendEmailRequest): diff --git a/async_customerio/client_base.py b/async_customerio/client_base.py index e60eeeb..4dee67a 100644 --- a/async_customerio/client_base.py +++ b/async_customerio/client_base.py @@ -10,6 +10,7 @@ import httpx +from async_customerio._config import DEFAULT_REQUEST_LIMITS, DEFAULT_REQUEST_TIMEOUT, RequestLimits, RequestTimeout from async_customerio.errors import AsyncCustomerIOError from async_customerio.utils import sanitize @@ -21,9 +22,35 @@ class AsyncClientBase: - def __init__(self, retries: int = 3, timeout: int = 10): - self.timeout = timeout - self.retries = retries + def __init__( + self, + retries: int = 3, + *, + request_timeout: RequestTimeout = DEFAULT_REQUEST_TIMEOUT, + request_limits: RequestLimits = DEFAULT_REQUEST_LIMITS, + ): + """ + + :param retries: set number of retries before give up + :param request_timeout: advanced feature that allows to change request timeout. + :param request_limits: advanced feature that allows to control the connection pool size. + """ + + self._retries = retries + self._request_timeout = request_timeout + self._request_transport = httpx.AsyncHTTPTransport( + limits=httpx.Limits(**request_limits.__dict__), retries=retries + ) + self._http_client: t.Optional[httpx.AsyncClient] = None + + @property + def _client(self) -> httpx.AsyncClient: + if self._http_client is None or self._http_client.is_closed: + self._http_client = httpx.AsyncClient( + timeout=httpx.Timeout(**self._request_timeout.__dict__), + transport=self._request_transport, + ) + return self._http_client @staticmethod def _get_request_id(): @@ -64,29 +91,27 @@ async def send_request( if headers: merged_headers.update(headers) - transport = httpx.AsyncHTTPTransport(retries=self.retries) - async with httpx.AsyncClient(timeout=self.timeout, transport=transport, auth=auth) as client: - logging.debug( - "Requesting method: %s, URL: %s, payload: %s, headers: %s", - method, - url, - json_payload, - headers, + logging.debug( + "Requesting method: %s, URL: %s, payload: %s, headers: %s", + method, + url, + json_payload, + headers, + ) + try: + raw_cio_response: httpx.Response = await self._client.request( + method, url, json=json_payload and sanitize(json_payload), headers=merged_headers ) - try: - raw_cio_response: httpx.Response = await client.request( - method, url, json=json_payload and sanitize(json_payload), headers=merged_headers - ) - result_status = raw_cio_response.status_code - if result_status != 200: - raise AsyncCustomerIOError(f"{result_status}: {url} {json_payload} {raw_cio_response.text}") - except Exception as err: - # Raise exception alerting user that the system might be - # experiencing an outage and refer them to system status page. - raise AsyncCustomerIOError( - CUSTOMERIO_UNAVAILABLE_MESSAGE.format(klass=type(err), message=err, count=self.retries) - ) - + result_status = raw_cio_response.status_code + if result_status != 200: + raise AsyncCustomerIOError(f"{result_status}: {url} {json_payload} {raw_cio_response.text}") + except Exception as err: + # Raise exception alerting user that the system might be + # experiencing an outage and refer them to system status page. + raise AsyncCustomerIOError( + CUSTOMERIO_UNAVAILABLE_MESSAGE.format(klass=type(err), message=err, count=self._retries) + ) + else: logging.debug( "Response Code: %s, Time spent to make a request: %s", raw_cio_response.status_code, diff --git a/async_customerio/track.py b/async_customerio/track.py index c9cea72..eff3ef5 100644 --- a/async_customerio/track.py +++ b/async_customerio/track.py @@ -6,6 +6,7 @@ from typing import Optional from urllib.parse import quote +from async_customerio._config import DEFAULT_REQUEST_TIMEOUT, RequestTimeout from async_customerio.client_base import AsyncClientBase from async_customerio.constants import CIOID, EMAIL, ID from async_customerio.errors import AsyncCustomerIOError @@ -38,7 +39,7 @@ def __init__( port: Optional[int] = None, url_prefix: Optional[str] = None, retries: int = 3, - timeout: int = 10, + request_timeout: RequestTimeout = DEFAULT_REQUEST_TIMEOUT, ): if not isinstance(region, Region): raise AsyncCustomerIOError("invalid region provided") @@ -49,7 +50,7 @@ def __init__( self.base_url = self.setup_base_url( host=host or self.DEFAULT_API_HOST, port=port or self.DEFAULT_API_PORT, prefix=url_prefix or self.API_PREFIX ) - super().__init__(retries=retries, timeout=timeout) + super().__init__(retries=retries, request_timeout=request_timeout) @staticmethod def _url_encode(id_: t.Union[str, int]) -> str: diff --git a/pyproject.toml b/pyproject.toml index 25ce226..d245a81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "async-customerio" -version = "1.3.0" +version = "1.4.0" description = "Async CustomerIO Client - a Python client to interact with CustomerIO in an async fashion." license = "MIT" authors = [