From 7bff12aea514bfbce1545d060fde2311f764503c Mon Sep 17 00:00:00 2001 From: Brian Lai <51336873+brianjlai@users.noreply.github.com> Date: Wed, 13 Jul 2022 18:01:07 -0400 Subject: [PATCH] [#3078] [CDK] Add support for enabling debug from command line and some basic general debug logs (#14521) * allow for command line debug option and basic debug statements + declarative * feedback from pr comments * fix some tests w/ req/res mixed up and fixing logging tests * formatting * pr feedback: cleaning up traces in logger.py and update docs with debug configuration * remove unneeded trace logger test * remove extra print statement --- airbyte-cdk/python/airbyte_cdk/entrypoint.py | 7 ++++ airbyte-cdk/python/airbyte_cdk/logger.py | 30 +++++++++++------ .../airbyte_cdk/sources/abstract_source.py | 32 ++++++++++++++++--- .../declarative/yaml_declarative_source.py | 11 ++++++- .../airbyte_cdk/sources/streams/http/http.py | 5 ++- .../sources/streams/http/test_http.py | 14 ++++---- .../python/unit_tests/sources/test_source.py | 13 +++++--- .../python/unit_tests/test_entrypoint.py | 27 ++++++++++------ airbyte-cdk/python/unit_tests/test_logger.py | 18 +++++------ .../cdk-python/README.md | 12 +++++++ .../tutorials/building-a-python-source.md | 18 +++++++++++ 11 files changed, 143 insertions(+), 44 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/entrypoint.py b/airbyte-cdk/python/airbyte_cdk/entrypoint.py index 74044291ff017..ad7ade90a931e 100644 --- a/airbyte-cdk/python/airbyte_cdk/entrypoint.py +++ b/airbyte-cdk/python/airbyte_cdk/entrypoint.py @@ -32,6 +32,7 @@ def __init__(self, source: Source): def parse_args(args: List[str]) -> argparse.Namespace: # set up parent parsers parent_parser = argparse.ArgumentParser(add_help=False) + parent_parser.add_argument("--debug", action="store_true", help="enables detailed debug logs related to the sync") main_parser = argparse.ArgumentParser() subparsers = main_parser.add_subparsers(title="commands", dest="command") @@ -67,6 +68,12 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: if not cmd: raise Exception("No command passed") + if hasattr(parsed_args, "debug") and parsed_args.debug: + self.logger.setLevel(logging.DEBUG) + self.logger.debug("Debug logs enabled") + else: + self.logger.setLevel(logging.INFO) + # todo: add try catch for exceptions with different exit codes source_spec: ConnectorSpecification = self.source.spec(self.logger) with tempfile.TemporaryDirectory() as temp_dir: diff --git a/airbyte-cdk/python/airbyte_cdk/logger.py b/airbyte-cdk/python/airbyte_cdk/logger.py index 4d3cdbb62766c..26882fdd4d4f6 100644 --- a/airbyte-cdk/python/airbyte_cdk/logger.py +++ b/airbyte-cdk/python/airbyte_cdk/logger.py @@ -2,6 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import json import logging import logging.config import traceback @@ -11,8 +12,6 @@ from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets from deprecated import deprecated -TRACE_LEVEL_NUM = 5 - LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, @@ -34,9 +33,8 @@ def init_logger(name: str = None): """Initial set up of logger""" - logging.addLevelName(TRACE_LEVEL_NUM, "TRACE") logger = logging.getLogger(name) - logger.setLevel(TRACE_LEVEL_NUM) + logger.setLevel(logging.INFO) logging.config.dictConfig(LOGGING_CONFIG) return logger @@ -51,16 +49,30 @@ class AirbyteLogFormatter(logging.Formatter): logging.WARNING: "WARN", logging.INFO: "INFO", logging.DEBUG: "DEBUG", - TRACE_LEVEL_NUM: "TRACE", } def format(self, record: logging.LogRecord) -> str: """Return a JSON representation of the log message""" - message = super().format(record) airbyte_level = self.level_mapping.get(record.levelno, "INFO") - message = filter_secrets(message) - log_message = AirbyteMessage(type="LOG", log=AirbyteLogMessage(level=airbyte_level, message=message)) - return log_message.json(exclude_unset=True) + if airbyte_level == "DEBUG": + extras = self.extract_extra_args_from_record(record) + debug_dict = {"type": "DEBUG", "message": record.getMessage(), "data": extras} + return filter_secrets(json.dumps(debug_dict)) + else: + message = super().format(record) + message = filter_secrets(message) + log_message = AirbyteMessage(type="LOG", log=AirbyteLogMessage(level=airbyte_level, message=message)) + return log_message.json(exclude_unset=True) + + @staticmethod + def extract_extra_args_from_record(record: logging.LogRecord): + """ + The python logger conflates default args with extra args. We use an empty log record and set operations + to isolate fields passed to the log record via extra by the developer. + """ + default_attrs = logging.LogRecord("", 0, "", 0, None, None, None).__dict__.keys() + extra_keys = set(record.__dict__.keys()) - default_attrs + return {k: str(getattr(record, k)) for k in extra_keys if hasattr(record, k)} def log_by_prefix(msg: str, default_level: str) -> Tuple[int, str]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 7d2eaa528df06..8cc002cd26463 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -109,7 +109,6 @@ def read( f"The requested stream {configured_stream.stream.name} was not found in the source." f" Available streams: {stream_instances.keys()}" ) - try: timer.start_event(f"Syncing stream {configured_stream.stream.name}") yield from self._read_stream( @@ -142,11 +141,20 @@ def _read_stream( connector_state: MutableMapping[str, Any], internal_config: InternalConfig, ) -> Iterator[AirbyteMessage]: - + self._apply_log_level_to_stream_logger(logger, stream_instance) if internal_config.page_size and isinstance(stream_instance, HttpStream): logger.info(f"Setting page size for {stream_instance.name} to {internal_config.page_size}") stream_instance.page_size = internal_config.page_size + logger.debug( + f"Syncing stream: {configured_stream.stream.name}", + extra={ + "sync_mode": configured_stream.sync_mode, + "primary_key": configured_stream.primary_key, + "cursor_field": configured_stream.cursor_field, + }, + ) + use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental if use_incremental: record_iterator = self._read_incremental( @@ -157,7 +165,7 @@ def _read_stream( internal_config, ) else: - record_iterator = self._read_full_refresh(stream_instance, configured_stream, internal_config) + record_iterator = self._read_full_refresh(logger, stream_instance, configured_stream, internal_config) record_counter = 0 stream_name = configured_stream.stream.name @@ -210,8 +218,10 @@ def _read_incremental( sync_mode=SyncMode.incremental, stream_state=stream_state, ) + logger.debug(f"Processing stream slices for {stream_name}", extra={"stream_slices": slices}) total_records_counter = 0 for _slice in slices: + logger.debug("Processing stream slice", extra={"slice": _slice}) records = stream_instance.read_records( sync_mode=SyncMode.incremental, stream_slice=_slice, @@ -239,15 +249,18 @@ def _read_incremental( def _read_full_refresh( self, + logger: logging.Logger, stream_instance: Stream, configured_stream: ConfiguredAirbyteStream, internal_config: InternalConfig, ) -> Iterator[AirbyteMessage]: slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field) + logger.debug(f"Processing stream slices for {configured_stream.stream.name}", extra={"stream_slices": slices}) total_records_counter = 0 - for slice in slices: + for _slice in slices: + logger.debug("Processing stream slice", extra={"slice": _slice}) records = stream_instance.read_records( - stream_slice=slice, + stream_slice=_slice, sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field, ) @@ -287,3 +300,12 @@ def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]): transformer.transform(data, schema) # type: ignore message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis) return AirbyteMessage(type=MessageType.RECORD, record=message) + + @staticmethod + def _apply_log_level_to_stream_logger(logger: logging.Logger, stream_instance: Stream): + """ + Necessary because we use different loggers at the source and stream levels. We must + apply the source's log level to each stream's logger. + """ + if hasattr(logger, "level"): + stream_instance.logger.setLevel(logger.level) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py index 38e6383732e1c..604dbb1c71866 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -2,6 +2,8 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import json +import logging from typing import Any, List, Mapping from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker @@ -13,6 +15,8 @@ class YamlDeclarativeSource(DeclarativeSource): def __init__(self, path_to_yaml): + self.logger = logging.getLogger(f"airbyte.{self.name}") + self.logger.setLevel(logging.DEBUG) self._factory = DeclarativeComponentFactory() self._source_config = self._read_and_parse_yaml_file(path_to_yaml) @@ -33,4 +37,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: def _read_and_parse_yaml_file(self, path_to_yaml_file): with open(path_to_yaml_file, "r") as f: config_content = f.read() - return YamlParser().parse(config_content) + parsed_config = YamlParser().parse(config_content) + self.logger.debug( + "parsed YAML into declarative source", + extra={"path_to_yaml_file": path_to_yaml_file, "source_name": self.name, "parsed_config": json.dumps(parsed_config)}, + ) + return parsed_config diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index e89d590026fed..1570c9a25a752 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -286,8 +286,11 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Unexpected transient exceptions use the default backoff parameters. Unexpected persistent exceptions are not handled and will cause the sync to fail. """ + self.logger.debug( + "Making outbound API request", extra={"headers": request.headers, "url": request.url, "request_body": request.body} + ) response: requests.Response = self._session.send(request, **request_kwargs) - + self.logger.debug("Receiving response", extra={"headers": response.headers, "status": response.status_code, "body": response.text}) if self.should_retry(response): custom_backoff_time = self.backoff_time(response) if custom_backoff_time: diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py index fe87508e51a58..86df87d48707e 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py @@ -240,10 +240,11 @@ def test_raise_on_http_errors_off_5xx(mocker, status_code): @pytest.mark.parametrize("status_code", [400, 401, 402, 403, 416]) def test_raise_on_http_errors_off_non_retryable_4xx(mocker, status_code): stream = AutoFailFalseHttpStream() - req = requests.Response() - req.status_code = status_code + req = requests.PreparedRequest() + res = requests.Response() + res.status_code = status_code - mocker.patch.object(requests.Session, "send", return_value=req) + mocker.patch.object(requests.Session, "send", return_value=res) response = stream._send_request(req, {}) assert response.status_code == status_code @@ -437,9 +438,10 @@ def test_send_raise_on_http_errors_logs(mocker, status_code): mocker.patch.object(AutoFailTrueHttpStream, "logger") mocker.patch.object(AutoFailTrueHttpStream, "should_retry", mocker.Mock(return_value=False)) stream = AutoFailTrueHttpStream() - req = requests.Response() - req.status_code = status_code - mocker.patch.object(requests.Session, "send", return_value=req) + req = requests.PreparedRequest() + res = requests.Response() + res.status_code = status_code + mocker.patch.object(requests.Session, "send", return_value=res) with pytest.raises(requests.exceptions.HTTPError): response = stream._send_request(req, {}) stream.logger.error.assert_called_with(response.text) diff --git a/airbyte-cdk/python/unit_tests/sources/test_source.py b/airbyte-cdk/python/unit_tests/sources/test_source.py index 7a65ba18784ee..de2b282012eef 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_source.py @@ -135,7 +135,8 @@ def test_internal_config(abstract_source, catalog): non_http_stream.read_records.return_value = [{}] * 3 # Test with empty config - records = [r for r in abstract_source.read(logger=MagicMock(), config={}, catalog=catalog, state={})] + logger = logging.getLogger(f"airbyte.{getattr(abstract_source, 'name', '')}") + records = [r for r in abstract_source.read(logger=logger, config={}, catalog=catalog, state={})] # 3 for http stream and 3 for non http stream assert len(records) == 3 + 3 assert http_stream.read_records.called @@ -145,19 +146,19 @@ def test_internal_config(abstract_source, catalog): assert not non_http_stream.page_size # Test with records limit set to 1 internal_config = {"some_config": 100, "_limit": 1} - records = [r for r in abstract_source.read(logger=MagicMock(), config=internal_config, catalog=catalog, state={})] + records = [r for r in abstract_source.read(logger=logger, config=internal_config, catalog=catalog, state={})] # 1 from http stream + 1 from non http stream assert len(records) == 1 + 1 assert "_limit" not in abstract_source.streams_config assert "some_config" in abstract_source.streams_config # Test with records limit set to number that exceeds expceted records internal_config = {"some_config": 100, "_limit": 20} - records = [r for r in abstract_source.read(logger=MagicMock(), config=internal_config, catalog=catalog, state={})] + records = [r for r in abstract_source.read(logger=logger, config=internal_config, catalog=catalog, state={})] assert len(records) == 3 + 3 # Check if page_size paramter is set to http instance only internal_config = {"some_config": 100, "_page_size": 2} - records = [r for r in abstract_source.read(logger=MagicMock(), config=internal_config, catalog=catalog, state={})] + records = [r for r in abstract_source.read(logger=logger, config=internal_config, catalog=catalog, state={})] assert "_page_size" not in abstract_source.streams_config assert "some_config" in abstract_source.streams_config assert len(records) == 3 + 3 @@ -168,6 +169,7 @@ def test_internal_config(abstract_source, catalog): def test_internal_config_limit(abstract_source, catalog): logger_mock = MagicMock() + logger_mock.level = logging.DEBUG del catalog.streams[1] STREAM_LIMIT = 2 FULL_RECORDS_NUMBER = 3 @@ -205,6 +207,7 @@ def test_internal_config_limit(abstract_source, catalog): def test_source_config_no_transform(abstract_source, catalog): logger_mock = MagicMock() + logger_mock.level = logging.DEBUG streams = abstract_source.streams(None) http_stream, non_http_stream = streams http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA @@ -218,6 +221,7 @@ def test_source_config_no_transform(abstract_source, catalog): def test_source_config_transform(abstract_source, catalog): logger_mock = MagicMock() + logger_mock.level = logging.DEBUG streams = abstract_source.streams(None) http_stream, non_http_stream = streams http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) @@ -231,6 +235,7 @@ def test_source_config_transform(abstract_source, catalog): def test_source_config_transform_and_no_transform(abstract_source, catalog): logger_mock = MagicMock() + logger_mock.level = logging.DEBUG streams = abstract_source.streams(None) http_stream, non_http_stream = streams http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) diff --git a/airbyte-cdk/python/unit_tests/test_entrypoint.py b/airbyte-cdk/python/unit_tests/test_entrypoint.py index 8fbbfe18e5cda..168539b5d2c04 100644 --- a/airbyte-cdk/python/unit_tests/test_entrypoint.py +++ b/airbyte-cdk/python/unit_tests/test_entrypoint.py @@ -38,7 +38,8 @@ def _as_arglist(cmd: str, named_args: Mapping[str, Any]) -> List[str]: out = [cmd] for k, v in named_args.items(): out.append(f"--{k}") - out.append(v) + if v: + out.append(v) return out @@ -56,19 +57,27 @@ def entrypoint() -> AirbyteEntrypoint: @pytest.mark.parametrize( - ["cmd", "args"], + ["cmd", "args", "expected_args"], [ - ("spec", dict()), - ("check", {"config": "config_path"}), - ("discover", {"config": "config_path"}), - ("read", {"config": "config_path", "catalog": "catalog_path", "state": "None"}), - ("read", {"config": "config_path", "catalog": "catalog_path", "state": "state_path"}), + ("spec", {"debug": ""}, {"command": "spec", "debug": True}), + ("check", {"config": "config_path"}, {"command": "check", "config": "config_path", "debug": False}), + ("discover", {"config": "config_path", "debug": ""}, {"command": "discover", "config": "config_path", "debug": True}), + ( + "read", + {"config": "config_path", "catalog": "catalog_path", "state": "None"}, + {"command": "read", "config": "config_path", "catalog": "catalog_path", "state": "None", "debug": False}, + ), + ( + "read", + {"config": "config_path", "catalog": "catalog_path", "state": "state_path", "debug": ""}, + {"command": "read", "config": "config_path", "catalog": "catalog_path", "state": "state_path", "debug": True}, + ), ], ) -def test_parse_valid_args(cmd: str, args: Mapping[str, Any], entrypoint: AirbyteEntrypoint): +def test_parse_valid_args(cmd: str, args: Mapping[str, Any], expected_args, entrypoint: AirbyteEntrypoint): arglist = _as_arglist(cmd, args) parsed_args = entrypoint.parse_args(arglist) - assert {"command": cmd, **args} == vars(parsed_args) + assert vars(parsed_args) == expected_args @pytest.mark.parametrize( diff --git a/airbyte-cdk/python/unit_tests/test_logger.py b/airbyte-cdk/python/unit_tests/test_logger.py index 8ae59e08ffbf3..e90184a3fcb1e 100644 --- a/airbyte-cdk/python/unit_tests/test_logger.py +++ b/airbyte-cdk/python/unit_tests/test_logger.py @@ -2,7 +2,6 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # - import json import logging from typing import Dict @@ -50,20 +49,21 @@ def test_level_transform(logger, caplog): assert level_critical == "FATAL" -def test_trace(logger, caplog): - logger.log(logging.getLevelName("TRACE"), "Test trace 1") - record = caplog.records[0] - assert record.levelname == "TRACE" - assert record.message == "Test trace 1" - - def test_debug(logger, caplog): - logger.debug("Test debug 1") + # Test debug logger in isolation since the default logger is initialized to TRACE (15) instead of DEBUG (10). + debug_logger = logging.getLogger("airbyte.Debuglogger") + debug_logger.setLevel(logging.DEBUG) + debug_logger.debug("Test debug 1") record = caplog.records[0] assert record.levelname == "DEBUG" assert record.message == "Test debug 1" +def test_default_debug_is_ignored(logger, caplog): + logger.debug("Test debug that is ignored since log level is TRACE") + assert len(caplog.records) == 0 + + def test_info(logger, caplog): logger.info("Test info 1") logger.info("Test info 2") diff --git a/docs/connector-development/cdk-python/README.md b/docs/connector-development/cdk-python/README.md index 9b059fe0f04c3..14c017d687246 100644 --- a/docs/connector-development/cdk-python/README.md +++ b/docs/connector-development/cdk-python/README.md @@ -86,6 +86,18 @@ pip install -e ".[tests]" # [tests] installs test-only dependencies * Perform static type checks using `mypy airbyte_cdk`. `MyPy` configuration is in `.mypy.ini`. * The `type_check_and_test.sh` script bundles both type checking and testing in one convenient command. Feel free to use it! +#### Debugging + +While developing your connector, you can print detailed debug information during a sync by specifying the `--debug` flag. This allows you to get a better picture of what is happening during each step of your sync. +```text +python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json --debug +``` + +In addition to preset CDK debug statements, you can also add your own statements to emit debug information specific to your connector: +```python +self.logger.debug("your debug message here", extra={"debug_field": self.value}) +``` + #### Testing All tests are located in the `unit_tests` directory. Run `pytest --cov=airbyte_cdk unit_tests/` to run them. This also presents a test coverage report. diff --git a/docs/connector-development/tutorials/building-a-python-source.md b/docs/connector-development/tutorials/building-a-python-source.md index 6bfc80fb91d17..2a8284bcf9cf0 100644 --- a/docs/connector-development/tutorials/building-a-python-source.md +++ b/docs/connector-development/tutorials/building-a-python-source.md @@ -147,6 +147,24 @@ Note: Each time you make a change to your implementation you need to re-build th The nice thing about this approach is that you are running your source exactly as it will be run by Airbyte. The tradeoff is that iteration is slightly slower, because you need to re-build the connector between each change. +**Detailed Debug Messages** + +During development of your connector, you can enable the printing of detailed debug information during a sync by specifying the `--debug` flag. This will allow you to get a better picture of what is happening during each step of your sync. +```text +python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json --debug +``` + +In addition to the preset CDK debug statements, you can also emit custom debug information from your connector by introducing your own debug statements: +```python +self.logger.debug( + "your debug message here", + extra={ + "debug_field": self.value, + "custom_field": your_object.field + } +) +``` + **TDD using standard tests** Airbyte provides a standard test suite that is run against every source. The objective of these tests is to provide some "free" tests that can sanity check that the basic functionality of the source works. One approach to developing your connector is to simply run the tests between each change and use the feedback from them to guide your development.