Skip to content

Commit

Permalink
Decode response in f-string (#34518)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taragolis committed Sep 21, 2023
1 parent 642b4ee commit 966c2bc
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 27 deletions.
17 changes: 9 additions & 8 deletions airflow/providers/databricks/hooks/databricks_base.py
Expand Up @@ -245,7 +245,8 @@ def _get_sp_token(self, resource: str) -> str:
except RetryError:
raise AirflowException(f"API requests to Databricks failed {self.retry_limit} times. Giving up.")
except requests_exceptions.HTTPError as e:
raise AirflowException(f"Response: {e.response.content}, Status Code: {e.response.status_code}")
msg = f"Response: {e.response.content.decode()}, Status Code: {e.response.status_code}"
raise AirflowException(msg)

return jsn["access_token"]

Expand Down Expand Up @@ -279,7 +280,8 @@ async def _a_get_sp_token(self, resource: str) -> str:
except RetryError:
raise AirflowException(f"API requests to Databricks failed {self.retry_limit} times. Giving up.")
except requests_exceptions.HTTPError as e:
raise AirflowException(f"Response: {e.response.content}, Status Code: {e.response.status_code}")
msg = f"Response: {e.response.content.decode()}, Status Code: {e.response.status_code}"
raise AirflowException(msg)

return jsn["access_token"]

Expand Down Expand Up @@ -340,7 +342,8 @@ def _get_aad_token(self, resource: str) -> str:
except RetryError:
raise AirflowException(f"API requests to Azure failed {self.retry_limit} times. Giving up.")
except requests_exceptions.HTTPError as e:
raise AirflowException(f"Response: {e.response.content}, Status Code: {e.response.status_code}")
msg = f"Response: {e.response.content.decode()}, Status Code: {e.response.status_code}"
raise AirflowException(msg)

return jsn["access_token"]

Expand Down Expand Up @@ -606,11 +609,9 @@ def _do_api_call(
raise AirflowException(f"API requests to Databricks failed {self.retry_limit} times. Giving up.")
except requests_exceptions.HTTPError as e:
if wrap_http_errors:
raise AirflowException(
f"Response: {e.response.content}, Status Code: {e.response.status_code}"
)
else:
raise e
msg = f"Response: {e.response.content.decode()}, Status Code: {e.response.status_code}"
raise AirflowException(msg)
raise

async def _a_do_api_call(self, endpoint_info: tuple[str, str], json: dict[str, Any] | None = None):
"""
Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/snowflake/hooks/snowflake_sql_api.py
Expand Up @@ -162,7 +162,8 @@ def execute_query(
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e: # pragma: no cover
raise AirflowException(f"Response: {e.response.content} Status Code: {e.response.status_code}")
msg = f"Response: {e.response.content.decode()} Status Code: {e.response.status_code}"
raise AirflowException(msg)
json_response = response.json()
self.log.info("Snowflake SQL POST API response: %s", json_response)
if "statementHandles" in json_response:
Expand Down Expand Up @@ -226,9 +227,8 @@ def check_query_output(self, query_ids: list[str]) -> None:
response.raise_for_status()
self.log.info(response.json())
except requests.exceptions.HTTPError as e:
raise AirflowException(
f"Response: {e.response.content}, Status Code: {e.response.status_code}"
)
msg = f"Response: {e.response.content.decode()}, Status Code: {e.response.status_code}"
raise AirflowException(msg)

def _process_response(self, status_code, resp):
self.log.info("Snowflake SQL GET statements status API response: %s", resp)
Expand Down
22 changes: 7 additions & 15 deletions tests/providers/snowflake/hooks/test_snowflake_sql_api.py
Expand Up @@ -27,6 +27,7 @@
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from responses import RequestsMock

from airflow.exceptions import AirflowException
from airflow.models import Connection
Expand Down Expand Up @@ -216,33 +217,24 @@ def test_check_query_output(self, mock_geturl_header_params, mock_requests, quer
hook.check_query_output(query_ids)
mock_log_info.assert_called_with(GET_RESPONSE)

@pytest.mark.parametrize(
"query_ids",
[
(["uuid", "uuid1"]),
],
)
@mock.patch("airflow.providers.snowflake.hooks.snowflake_sql_api.requests.get")
@pytest.mark.parametrize("query_ids", [(["uuid", "uuid1"])])
@mock.patch(
"airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook."
"get_request_url_header_params"
)
def test_check_query_output_exception(self, mock_geturl_header_params, mock_requests, query_ids):
def test_check_query_output_exception(self, mock_geturl_header_params, query_ids):
"""
Test check_query_output by passing query ids as params and mock get_request_url_header_params
to raise airflow exception and mock with http error
"""
req_id = uuid.uuid4()
params = {"requestId": str(req_id), "page": 2, "pageSize": 10}
mock_geturl_header_params.return_value = HEADERS, params, "/test/airflow/"
mock_resp = requests.models.Response()
mock_resp.status_code = 404
mock_requests.return_value = mock_resp
mock_geturl_header_params.return_value = HEADERS, params, "https://test/airflow/"
hook = SnowflakeSqlApiHook("mock_conn_id")
with mock.patch.object(hook.log, "error"):
with pytest.raises(AirflowException) as airflow_exception:
with mock.patch.object(hook.log, "error"), RequestsMock() as requests_mock:
requests_mock.get(url="https://test/airflow/", json={"foo": "bar"}, status=500)
with pytest.raises(AirflowException, match='Response: {"foo": "bar"}, Status Code: 500'):
hook.check_query_output(query_ids)
assert airflow_exception

@mock.patch("airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook._get_conn_params")
@mock.patch("airflow.providers.snowflake.hooks.snowflake_sql_api.SnowflakeSqlApiHook.get_headers")
Expand Down

0 comments on commit 966c2bc

Please sign in to comment.