/
retry_session.py
43 lines (34 loc) · 1.17 KB
/
retry_session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from requests import Response, Session
from requests.adapters import HTTPAdapter
from urllib3 import Retry
_HEADERS = {"Content-Type": "application/json"}
# Inspired by https://www.peterbe.com/plog/best-practice-with-retries-with-requests
def _requests_retry_session(
retries=6,
backoff_factor=1,
status_forcelist=(500, 502, 504),
session=None,
) -> Session:
"""Instantiate a session with Retry backoff."""
session = session or Session()
retry = Retry(
raise_on_redirect=False,
raise_on_status=False,
method_whitelist=["POST", "GET"],
status=retries,
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def retry_get(*args, **kwargs) -> Response:
with _requests_retry_session() as s:
return s.get(headers=_HEADERS, *args, **kwargs)
def retry_post(*args, **kwargs) -> Response:
with _requests_retry_session() as s:
return s.post(headers=_HEADERS, *args, **kwargs)