Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Changelog
## v1.2.1 7/25/24
- Add retry for fetching Avro schemas

## v1.2.0 7/17/24
- Generalized Avro functions and separated encoding/decoding behavior.
- Generalized Avro functions and separated encoding/decoding behavior

## v1.1.6 7/12/24
- Add put functionality to Oauth2 Client
Expand Down
19 changes: 14 additions & 5 deletions src/nypl_py_utils/classes/avro_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter
from io import BytesIO
from nypl_py_utils.functions.log_helper import create_log
from requests.exceptions import JSONDecodeError, RequestException
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import JSONDecodeError


class AvroClient:
Expand All @@ -15,7 +16,13 @@ class AvroClient:
"""

def __init__(self, platform_schema_url):
self.logger = create_log("avro_encoder")
self.logger = create_log("avro_client")
retry_policy = Retry(total=3, backoff_factor=45,
status_forcelist=[500, 502, 503, 504],
allowed_methods=frozenset(['GET']))
self.session = requests.Session()
self.session.mount("https://",
HTTPAdapter(max_retries=retry_policy))
self.schema = avro.schema.parse(
self.get_json_schema(platform_schema_url))

Expand All @@ -27,9 +34,11 @@ def get_json_schema(self, platform_schema_url):
self.logger.info(
"Fetching Avro schema from {}".format(platform_schema_url))
try:
response = requests.get(platform_schema_url)

response = self.session.get(url=platform_schema_url,
timeout=60)
response.raise_for_status()
except RequestException as e:
except Exception as e:
self.logger.error(
"Failed to retrieve schema from {url}: {error}".format(
url=platform_schema_url, error=e
Expand All @@ -39,7 +48,7 @@ def get_json_schema(self, platform_schema_url):
"Failed to retrieve schema from {url}: {error}".format(
url=platform_schema_url, error=e
)
) from None
)

try:
json_response = response.json()
Expand Down
30 changes: 15 additions & 15 deletions tests/test_avro_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,26 @@ class TestAvroClient:
@pytest.fixture
def test_avro_encoder_instance(self, requests_mock):
requests_mock.get(
'https://test_schema_url', text=json.dumps(_TEST_SCHEMA))
return AvroEncoder('https://test_schema_url')
"https://test_schema_url", text=json.dumps(_TEST_SCHEMA))
return AvroEncoder("https://test_schema_url")

@pytest.fixture
def test_avro_decoder_instance(self, requests_mock):
requests_mock.get(
'https://test_schema_url', text=json.dumps(_TEST_SCHEMA))
return AvroDecoder('https://test_schema_url')

def test_get_json_schema(self, test_avro_encoder_instance,
test_avro_decoder_instance):
assert test_avro_encoder_instance.schema == _TEST_SCHEMA['data'][
'schema']
assert test_avro_decoder_instance.schema == _TEST_SCHEMA['data'][
'schema']

def test_request_error(self, requests_mock):
requests_mock.get('https://test_schema_url', exc=ConnectTimeout)
"https://test_schema_url", text=json.dumps(_TEST_SCHEMA))
return AvroDecoder("https://test_schema_url")

def test_get_json_schema_success(self, test_avro_encoder_instance,
Comment thread
fatimarahman marked this conversation as resolved.
test_avro_decoder_instance):
assert test_avro_encoder_instance.schema == _TEST_SCHEMA["data"][
"schema"]
assert test_avro_decoder_instance.schema == _TEST_SCHEMA["data"][
"schema"]

def test_get_json_schema_error(self, requests_mock):
requests_mock.get("https://test_schema_url", exc=ConnectTimeout)
with pytest.raises(AvroClientError):
AvroEncoder('https://test_schema_url')
AvroEncoder("https://test_schema_url")

def test_bad_json_error(self, requests_mock):
requests_mock.get(
Expand Down