Skip to content

Commit

Permalink
feat: Refactor base HTTP client to enable request limits to work (#18)
Browse files Browse the repository at this point in the history
* feat: Refactor base HTTP client to enable request limits to work
  • Loading branch information
akalex committed Mar 11, 2024
1 parent 6953322 commit 4904840
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 32 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ repos:
rev: 23.1.0
hooks:
- id: black
language_version: python3.11
language_version: python3.8
args: [--line-length=120, --skip-string-normalization]

- repo: https://github.com/pycqa/flake8
Expand All @@ -24,7 +24,7 @@ repos:
- id: flake8

- repo: https://github.com/pycqa/isort
rev: 5.10.1
rev: 5.11.5
hooks:
- id: isort
stages: [commit]
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.4.0
* Refactored ``async_customerio.client_base.AsyncClientBase`` to take advantage of connection pool. So the HTTP client will be created once during class ``async_customerio.track.AsyncCustomerIO`` and ``async_customerio.api.AsyncAPIClient`` instantiation.


## 1.3.0

- Support of Python 3.12 added.
Expand Down
39 changes: 39 additions & 0 deletions async_customerio/_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import typing as t
from dataclasses import dataclass


@dataclass
class RequestTimeout:
"""
Request timeout configuration.
Arguments:
timeout: Timeout on all operations eg, read, write, connect.
Examples:
RequestTimeout(None) # No timeouts.
RequestTimeout(5.0) # 5s timeout on all operations.
"""

timeout: t.Optional[float] = None


@dataclass
class RequestLimits:
"""
Configuration for request limits.
Attributes:
max_connections: The maximum number of concurrent connections that may be established.
max_keepalive_connections: Allow the connection pool to maintain keep-alive connections
below this point. Should be less than or equal to `max_connections`.
keepalive_expiry: Time limit on idle keep-alive connections in seconds.
"""

max_connections: t.Optional[int] = None
max_keepalive_connections: t.Optional[int] = None
keepalive_expiry: t.Optional[int] = None


DEFAULT_REQUEST_TIMEOUT = RequestTimeout(timeout=10.0)
DEFAULT_REQUEST_LIMITS = RequestLimits(max_connections=100, max_keepalive_connections=20, keepalive_expiry=5)
11 changes: 9 additions & 2 deletions async_customerio/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
"""
import base64

from async_customerio._config import DEFAULT_REQUEST_TIMEOUT, RequestTimeout


try:
from typing import Dict, Literal, Optional, TypedDict, Union
Expand Down Expand Up @@ -210,14 +212,19 @@ class AsyncAPIClient(AsyncClientBase):
SEND_PUSH_NOTIFICATION_ENDPOINT = "/send/push"

def __init__(
self, key: str, url: Optional[str] = None, region: Region = Regions.US, retries: int = 3, timeout: int = 10
self,
key: str,
url: Optional[str] = None,
region: Region = Regions.US,
retries: int = 3,
request_timeout: RequestTimeout = DEFAULT_REQUEST_TIMEOUT,
):
if not isinstance(region, Region):
raise AsyncCustomerIOError("invalid region provided")

self.key = key
self.base_url = url or "https://{host}".format(host=region.api_host)
super().__init__(retries=retries, timeout=timeout)
super().__init__(retries=retries, request_timeout=request_timeout)

async def send_email(self, request: SendEmailRequest) -> dict:
if not isinstance(request, SendEmailRequest):
Expand Down
75 changes: 50 additions & 25 deletions async_customerio/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import httpx

from async_customerio._config import DEFAULT_REQUEST_LIMITS, DEFAULT_REQUEST_TIMEOUT, RequestLimits, RequestTimeout
from async_customerio.errors import AsyncCustomerIOError
from async_customerio.utils import sanitize

Expand All @@ -21,9 +22,35 @@


class AsyncClientBase:
def __init__(self, retries: int = 3, timeout: int = 10):
self.timeout = timeout
self.retries = retries
def __init__(
self,
retries: int = 3,
*,
request_timeout: RequestTimeout = DEFAULT_REQUEST_TIMEOUT,
request_limits: RequestLimits = DEFAULT_REQUEST_LIMITS,
):
"""
:param retries: set number of retries before give up
:param request_timeout: advanced feature that allows to change request timeout.
:param request_limits: advanced feature that allows to control the connection pool size.
"""

self._retries = retries
self._request_timeout = request_timeout
self._request_transport = httpx.AsyncHTTPTransport(
limits=httpx.Limits(**request_limits.__dict__), retries=retries
)
self._http_client: t.Optional[httpx.AsyncClient] = None

@property
def _client(self) -> httpx.AsyncClient:
if self._http_client is None or self._http_client.is_closed:
self._http_client = httpx.AsyncClient(
timeout=httpx.Timeout(**self._request_timeout.__dict__),
transport=self._request_transport,
)
return self._http_client

@staticmethod
def _get_request_id():
Expand Down Expand Up @@ -64,29 +91,27 @@ async def send_request(
if headers:
merged_headers.update(headers)

transport = httpx.AsyncHTTPTransport(retries=self.retries)
async with httpx.AsyncClient(timeout=self.timeout, transport=transport, auth=auth) as client:
logging.debug(
"Requesting method: %s, URL: %s, payload: %s, headers: %s",
method,
url,
json_payload,
headers,
logging.debug(
"Requesting method: %s, URL: %s, payload: %s, headers: %s",
method,
url,
json_payload,
headers,
)
try:
raw_cio_response: httpx.Response = await self._client.request(
method, url, json=json_payload and sanitize(json_payload), headers=merged_headers
)
try:
raw_cio_response: httpx.Response = await client.request(
method, url, json=json_payload and sanitize(json_payload), headers=merged_headers
)
result_status = raw_cio_response.status_code
if result_status != 200:
raise AsyncCustomerIOError(f"{result_status}: {url} {json_payload} {raw_cio_response.text}")
except Exception as err:
# Raise exception alerting user that the system might be
# experiencing an outage and refer them to system status page.
raise AsyncCustomerIOError(
CUSTOMERIO_UNAVAILABLE_MESSAGE.format(klass=type(err), message=err, count=self.retries)
)

result_status = raw_cio_response.status_code
if result_status != 200:
raise AsyncCustomerIOError(f"{result_status}: {url} {json_payload} {raw_cio_response.text}")
except Exception as err:
# Raise exception alerting user that the system might be
# experiencing an outage and refer them to system status page.
raise AsyncCustomerIOError(
CUSTOMERIO_UNAVAILABLE_MESSAGE.format(klass=type(err), message=err, count=self._retries)
)
else:
logging.debug(
"Response Code: %s, Time spent to make a request: %s",
raw_cio_response.status_code,
Expand Down
5 changes: 3 additions & 2 deletions async_customerio/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Optional
from urllib.parse import quote

from async_customerio._config import DEFAULT_REQUEST_TIMEOUT, RequestTimeout
from async_customerio.client_base import AsyncClientBase
from async_customerio.constants import CIOID, EMAIL, ID
from async_customerio.errors import AsyncCustomerIOError
Expand Down Expand Up @@ -38,7 +39,7 @@ def __init__(
port: Optional[int] = None,
url_prefix: Optional[str] = None,
retries: int = 3,
timeout: int = 10,
request_timeout: RequestTimeout = DEFAULT_REQUEST_TIMEOUT,
):
if not isinstance(region, Region):
raise AsyncCustomerIOError("invalid region provided")
Expand All @@ -49,7 +50,7 @@ def __init__(
self.base_url = self.setup_base_url(
host=host or self.DEFAULT_API_HOST, port=port or self.DEFAULT_API_PORT, prefix=url_prefix or self.API_PREFIX
)
super().__init__(retries=retries, timeout=timeout)
super().__init__(retries=retries, request_timeout=request_timeout)

@staticmethod
def _url_encode(id_: t.Union[str, int]) -> str:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "async-customerio"
version = "1.3.0"
version = "1.4.0"
description = "Async CustomerIO Client - a Python client to interact with CustomerIO in an async fashion."
license = "MIT"
authors = [
Expand Down

0 comments on commit 4904840

Please sign in to comment.