From 2f2e530f408281536c21be2692cceb0ee578926f Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 3 Jan 2023 09:45:36 +0000 Subject: [PATCH] [low-code] convert request.body to a dict when converting to AirbyteLogMessage (#20557) * convert request body * fix tests * test body data * more tests * more tests * _ * return stacktrace * pretty print * Revert "pretty print" This reverts commit 091253803c55230ca95c38d0f1d37307c3c6e6c5. * Revert "Revert "pretty print"" This reverts commit b6f62d645b8b2a6a8f027e3c19fc9e5847c6d0ed. * replace \n * missing type hint --- .../retrievers/simple_retriever.py | 50 +++-- .../retrievers/test_simple_retriever.py | 200 +++++++++++++++++- .../connector_builder/impl/default_api.py | 22 +- .../impl/test_default_api.py | 24 ++- 4 files changed, 270 insertions(+), 26 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 8caf904fc469..65128a349483 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -5,6 +5,7 @@ import json import logging from dataclasses import InitVar, dataclass, field +from json import JSONDecodeError from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import requests @@ -367,7 +368,7 @@ def read_records( stream_slice = stream_slice or {} # None-check self.paginator.reset() records_generator = self._read_pages( - self.parse_records_and_emit_request_and_responses, + self._parse_records_and_emit_request_and_responses, stream_slice, stream_state, ) @@ -405,23 +406,42 @@ def state(self, value: StreamState): """State setter, accept state serialized by state getter.""" self.stream_slicer.update_cursor(value) - def parse_records_and_emit_request_and_responses(self, request, response, stream_slice, stream_state) -> Iterable[StreamData]: + def _parse_records_and_emit_request_and_responses(self, request, response, stream_slice, stream_state) -> Iterable[StreamData]: # Only emit requests and responses when running in debug mode if self.logger.isEnabledFor(logging.DEBUG): - yield self._create_trace_message_from_request(request) - yield self._create_trace_message_from_response(response) + yield _prepared_request_to_airbyte_message(request) + yield _response_to_airbyte_message(response) # Not great to need to call _read_pages which is a private method # A better approach would be to extract the HTTP client from the HttpStream and call it directly from the HttpRequester yield from self.parse_response(response, stream_slice=stream_slice, stream_state=stream_state) - def _create_trace_message_from_request(self, request: requests.PreparedRequest): - # FIXME: this should return some sort of trace message - request_dict = {"url": request.url, "http_method": request.method, "headers": dict(request.headers), "body": request.body} - log_message = filter_secrets(f"request:{json.dumps(request_dict)}") - return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message)) - - def _create_trace_message_from_response(self, response: requests.Response): - # FIXME: this should return some sort of trace message - response_dict = {"body": response.text, "headers": dict(response.headers), "status_code": response.status_code} - log_message = filter_secrets(f"response:{json.dumps(response_dict)}") - return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message)) + +def _prepared_request_to_airbyte_message(request: requests.PreparedRequest) -> AirbyteMessage: + # FIXME: this should return some sort of trace message + request_dict = { + "url": request.url, + "http_method": request.method, + "headers": dict(request.headers), + "body": _body_binary_string_to_dict(request.body), + } + log_message = filter_secrets(f"request:{json.dumps(request_dict)}") + return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message)) + + +def _body_binary_string_to_dict(body_str: str) -> Optional[Mapping[str, str]]: + if body_str: + if isinstance(body_str, (bytes, bytearray)): + body_str = body_str.decode() + try: + return json.loads(body_str) + except JSONDecodeError: + return {k: v for k, v in [s.split("=") for s in body_str.split("&")]} + else: + return None + + +def _response_to_airbyte_message(response: requests.Response) -> AirbyteMessage: + # FIXME: this should return some sort of trace message + response_dict = {"body": response.text, "headers": dict(response.headers), "status_code": response.status_code} + log_message = filter_secrets(f"response:{json.dumps(response_dict)}") + return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message)) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index bcc2c5e736ce..5a4f4b45fd9f 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -7,13 +7,17 @@ import airbyte_cdk.sources.declarative.requesters.error_handlers.response_status as response_status import pytest import requests -from airbyte_cdk.models import AirbyteLogMessage, Level, SyncMode +from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode, Type from airbyte_cdk.sources.declarative.exceptions import ReadException from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod -from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever +from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( + SimpleRetriever, + _prepared_request_to_airbyte_message, + _response_to_airbyte_message, +) from airbyte_cdk.sources.declarative.stream_slicers import DatetimeStreamSlicer from airbyte_cdk.sources.streams.http.auth import NoAuth from airbyte_cdk.sources.streams.http.http import HttpStream @@ -433,3 +437,195 @@ def test_path(test_name, requester_path, paginator_path, expected_path): actual_path = retriever.path(stream_state=None, stream_slice=None, next_page_token=None) assert expected_path == actual_path + + +@pytest.mark.parametrize( + "test_name, http_method, url, headers, params, body_json, body_data, expected_airbyte_message", + [ + ( + "test_basic_get_request", + HttpMethod.GET, + "https://airbyte.io", + {}, + {}, + {}, + {}, + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage( + level=Level.INFO, message='request:{"url": "https://airbyte.io/", "http_method": "GET", "headers": {}, "body": null}' + ), + ), + ), + ( + "test_get_request_with_headers", + HttpMethod.GET, + "https://airbyte.io", + {"h1": "v1", "h2": "v2"}, + {}, + {}, + {}, + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage( + level=Level.INFO, + message='request:{"url": "https://airbyte.io/", "http_method": "GET", "headers": {"h1": "v1", "h2": "v2"}, "body": null}', + ), + ), + ), + ( + "test_get_request_with_request_params", + HttpMethod.GET, + "https://airbyte.io", + {}, + {"p1": "v1", "p2": "v2"}, + {}, + {}, + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage( + level=Level.INFO, + message='request:{"url": "https://airbyte.io/?p1=v1&p2=v2", "http_method": "GET", "headers": {}, "body": null}', + ), + ), + ), + ( + "test_get_request_with_request_body_json", + HttpMethod.GET, + "https://airbyte.io", + {"Content-Type": "application/json"}, + {}, + {"b1": "v1", "b2": "v2"}, + {}, + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage( + level=Level.INFO, + message='request:{"url": "https://airbyte.io/", "http_method": "GET", "headers": {"Content-Type": "application/json", "Content-Length": "24"}, "body": {"b1": "v1", "b2": "v2"}}', + ), + ), + ), + ( + "test_get_request_with_headers_params_and_body", + HttpMethod.GET, + "https://airbyte.io", + {"Content-Type": "application/json", "h1": "v1"}, + {"p1": "v1", "p2": "v2"}, + {"b1": "v1", "b2": "v2"}, + {}, + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage( + level=Level.INFO, + message='request:{"url": "https://airbyte.io/?p1=v1&p2=v2", "http_method": "GET", "headers": {"Content-Type": "application/json", "h1": "v1", "Content-Length": "24"}, "body": {"b1": "v1", "b2": "v2"}}', + ), + ), + ), + ( + "test_get_request_with_request_body_data", + HttpMethod.GET, + "https://airbyte.io", + {"Content-Type": "application/json"}, + {}, + {}, + {"b1": "v1", "b2": "v2"}, + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage( + level=Level.INFO, + message='request:{"url": "https://airbyte.io/", "http_method": "GET", "headers": {"Content-Type": "application/json", "Content-Length": "11"}, "body": {"b1": "v1", "b2": "v2"}}', + ), + ), + ), + ( + "test_basic_post_request", + HttpMethod.POST, + "https://airbyte.io", + {}, + {}, + {}, + {}, + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage( + level=Level.INFO, + message='request:{"url": "https://airbyte.io/", "http_method": "POST", "headers": {"Content-Length": "0"}, "body": null}', + ), + ), + ), + ], +) +def test_prepared_request_to_airbyte_message(test_name, http_method, url, headers, params, body_json, body_data, expected_airbyte_message): + request = requests.Request(method=http_method.name, url=url, headers=headers, params=params) + if body_json: + request.json = body_json + if body_data: + request.data = body_data + prepared_request = request.prepare() + + actual_airbyte_message = _prepared_request_to_airbyte_message(prepared_request) + + assert expected_airbyte_message == actual_airbyte_message + + +@pytest.mark.parametrize( + "test_name, response_body, response_headers, status_code, expected_airbyte_message", + [ + ( + "test_response_no_body_no_headers", + b"", + {}, + 200, + AirbyteMessage( + type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message='response:{"body": "", "headers": {}, "status_code": 200}') + ), + ), + ( + "test_response_no_body_with_headers", + b"", + {"h1": "v1", "h2": "v2"}, + 200, + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage( + level=Level.INFO, message='response:{"body": "", "headers": {"h1": "v1", "h2": "v2"}, "status_code": 200}' + ), + ), + ), + ( + "test_response_with_body_no_headers", + b'{"b1": "v1", "b2": "v2"}', + {}, + 200, + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage( + level=Level.INFO, + message='response:{"body": "{\\"b1\\": \\"v1\\", \\"b2\\": \\"v2\\"}", "headers": {}, "status_code": 200}', + ), + ), + ), + ( + "test_response_with_body_and_headers", + b'{"b1": "v1", "b2": "v2"}', + {"h1": "v1", "h2": "v2"}, + 200, + AirbyteMessage( + type=Type.LOG, + log=AirbyteLogMessage( + level=Level.INFO, + message='response:{"body": "{\\"b1\\": \\"v1\\", \\"b2\\": \\"v2\\"}", "headers": {"h1": "v1", "h2": "v2"}, "status_code": 200}', + ), + ), + ), + ], +) +def test_response_to_airbyte_message(test_name, response_body, response_headers, status_code, expected_airbyte_message): + response = requests.Response() + response.status_code = status_code + response.headers = response_headers + response._content = response_body + + actual_airbyte_message = _response_to_airbyte_message(response) + + assert expected_airbyte_message == actual_airbyte_message diff --git a/airbyte-connector-builder-server/connector_builder/impl/default_api.py b/airbyte-connector-builder-server/connector_builder/impl/default_api.py index 4516dc79791b..cd0db29fb723 100644 --- a/airbyte-connector-builder-server/connector_builder/impl/default_api.py +++ b/airbyte-connector-builder-server/connector_builder/impl/default_api.py @@ -4,14 +4,12 @@ import json import logging +import traceback from json import JSONDecodeError from typing import Any, Dict, Iterable, Optional, Union from urllib.parse import parse_qs, urljoin, urlparse from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type -from fastapi import Body, HTTPException -from jsonschema import ValidationError - from connector_builder.generated.apis.default_api_interface import DefaultApi from connector_builder.generated.models.http_request import HttpRequest from connector_builder.generated.models.http_response import HttpResponse @@ -23,6 +21,8 @@ from connector_builder.generated.models.streams_list_read_streams import StreamsListReadStreams from connector_builder.generated.models.streams_list_request_body import StreamsListRequestBody from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter +from fastapi import Body, HTTPException +from jsonschema import ValidationError class DefaultApiImpl(DefaultApi): @@ -113,7 +113,7 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo log_messages = [] try: for message_group in self._get_message_groups( - adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config) + adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config) ): if isinstance(message_group, AirbyteLogMessage): log_messages.append({"message": message_group.message}) @@ -121,7 +121,10 @@ async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Bo single_slice.pages.append(message_group) except Exception as error: # TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec - raise HTTPException(status_code=400, detail=f"Could not perform read with with error: {error.args[0]}") + raise HTTPException( + status_code=400, + detail=f"Could not perform read with with error: {error.args[0]} - {self._get_stacktrace_as_string(error)}", + ) return StreamRead(logs=log_messages, slices=[single_slice]) @@ -207,4 +210,11 @@ def _create_low_code_adapter(manifest: Dict[str, Any]) -> LowCodeSourceAdapter: return LowCodeSourceAdapter(manifest=manifest) except ValidationError as error: # TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec - raise HTTPException(status_code=400, detail=f"Invalid connector manifest with error: {error.message}") + raise HTTPException( + status_code=400, + detail=f"Invalid connector manifest with error: {error.message} - {DefaultApiImpl._get_stacktrace_as_string(error)}", + ) + + @staticmethod + def _get_stacktrace_as_string(error) -> str: + return "".join(traceback.TracebackException.from_exception(error).format()) diff --git a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py index c92497cc0cae..8ab0030f3ec7 100644 --- a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py +++ b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py @@ -155,6 +155,7 @@ def test_read_stream(): request = { "url": "https://demonslayers.com/api/v1/hashiras?era=taisho", "headers": {"Content-Type": "application/json"}, + "http_method": "GET", "body": {"custom": "field"}, } response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} @@ -165,6 +166,7 @@ def test_read_stream(): parameters={"era": ["taisho"]}, headers={"Content-Type": "application/json"}, body={"custom": "field"}, + http_method="GET", ), response=HttpResponse(status=200, headers={"field": "value"}, body={"name": "field"}), records=[{"name": "Shinobu Kocho"}, {"name": "Muichiro Tokito"}], @@ -175,6 +177,7 @@ def test_read_stream(): parameters={"era": ["taisho"]}, headers={"Content-Type": "application/json"}, body={"custom": "field"}, + http_method="GET", ), response=HttpResponse(status=200, headers={"field": "value"}, body={"name": "field"}), records=[{"name": "Mitsuri Kanroji"}], @@ -210,6 +213,7 @@ def test_read_stream_with_logs(): "url": "https://demonslayers.com/api/v1/hashiras?era=taisho", "headers": {"Content-Type": "application/json"}, "body": {"custom": "field"}, + "http_method": "GET", } response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} expected_pages = [ @@ -219,6 +223,7 @@ def test_read_stream_with_logs(): parameters={"era": ["taisho"]}, headers={"Content-Type": "application/json"}, body={"custom": "field"}, + http_method="GET", ), response=HttpResponse(status=200, headers={"field": "value"}, body={"name": "field"}), records=[{"name": "Shinobu Kocho"}, {"name": "Muichiro Tokito"}], @@ -229,6 +234,7 @@ def test_read_stream_with_logs(): parameters={"era": ["taisho"]}, headers={"Content-Type": "application/json"}, body={"custom": "field"}, + http_method="GET", ), response=HttpResponse(status=200, headers={"field": "value"}, body={"name": "field"}), records=[{"name": "Mitsuri Kanroji"}], @@ -272,6 +278,7 @@ def test_read_stream_no_records(): "url": "https://demonslayers.com/api/v1/hashiras?era=taisho", "headers": {"Content-Type": "application/json"}, "body": {"custom": "field"}, + "http_method": "GET", } response = {"status_code": 200, "headers": {"field": "value"}, "body": '{"name": "field"}'} expected_pages = [ @@ -281,6 +288,7 @@ def test_read_stream_no_records(): parameters={"era": ["taisho"]}, headers={"Content-Type": "application/json"}, body={"custom": "field"}, + http_method="GET", ), response=HttpResponse(status=200, headers={"field": "value"}, body={"name": "field"}), records=[], @@ -291,6 +299,7 @@ def test_read_stream_no_records(): parameters={"era": ["taisho"]}, headers={"Content-Type": "application/json"}, body={"custom": "field"}, + http_method="GET", ), response=HttpResponse(status=200, headers={"field": "value"}, body={"name": "field"}), records=[], @@ -388,15 +397,19 @@ def test_read_stream_returns_error_if_stream_does_not_exist(): pytest.param( 'request:{"url": "https://nichirin.com/v1/swords?color=orange", "http_method": "PUT", "headers": {"field": "name"}, "body":{"key": "value"}}', HttpRequest( - url="https://nichirin.com/v1/swords", parameters={"color": ["orange"]}, headers={"field": "name"}, body={"key": "value"}, + url="https://nichirin.com/v1/swords", + parameters={"color": ["orange"]}, + headers={"field": "name"}, + body={"key": "value"}, http_method="PUT", ), id="test_create_request_with_all_fields", ), pytest.param( 'request:{"url": "https://nichirin.com/v1/swords?color=orange", "http_method": "GET", "headers": {"field": "name"}}', - HttpRequest(url="https://nichirin.com/v1/swords", parameters={"color": ["orange"]}, headers={"field": "name"}, - http_method="GET"), + HttpRequest( + url="https://nichirin.com/v1/swords", parameters={"color": ["orange"]}, headers={"field": "name"}, http_method="GET" + ), id="test_create_request_with_no_body", ), pytest.param( @@ -409,6 +422,11 @@ def test_read_stream_returns_error_if_stream_does_not_exist(): HttpRequest(url="https://nichirin.com/v1/swords", headers={"field": "name"}, body={"key": "value"}, http_method="PUT"), id="test_create_request_with_no_parameters", ), + pytest.param( + 'request:{"url": "https://nichirin.com/v1/swords", "http_method": "POST", "headers": {"field": "name"}, "body":null}', + HttpRequest(url="https://nichirin.com/v1/swords", headers={"field": "name"}, body=None, http_method="POST"), + id="test_create_request_with_null_body", + ), pytest.param("request:{invalid_json: }", None, id="test_invalid_json_still_does_not_crash"), pytest.param("just a regular log message", None, id="test_no_request:_prefix_does_not_crash"), ],