Skip to content

Commit

Permalink
[#3078] [CDK] Add support for enabling debug from command line and so…
Browse files Browse the repository at this point in the history
…me basic general debug logs (#14521)

* allow for command line debug option and basic debug statements + declarative

* feedback from pr comments

* fix some tests w/ req/res mixed up and fixing logging tests

* formatting

* pr feedback: cleaning up traces in logger.py and update docs with debug configuration

* remove unneeded trace logger test

* remove extra print statement
  • Loading branch information
brianjlai committed Jul 13, 2022
1 parent 1671def commit 7bff12a
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 44 deletions.
7 changes: 7 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, source: Source):
def parse_args(args: List[str]) -> argparse.Namespace:
# set up parent parsers
parent_parser = argparse.ArgumentParser(add_help=False)
parent_parser.add_argument("--debug", action="store_true", help="enables detailed debug logs related to the sync")
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(title="commands", dest="command")

Expand Down Expand Up @@ -67,6 +68,12 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
if not cmd:
raise Exception("No command passed")

if hasattr(parsed_args, "debug") and parsed_args.debug:
self.logger.setLevel(logging.DEBUG)
self.logger.debug("Debug logs enabled")
else:
self.logger.setLevel(logging.INFO)

# todo: add try catch for exceptions with different exit codes
source_spec: ConnectorSpecification = self.source.spec(self.logger)
with tempfile.TemporaryDirectory() as temp_dir:
Expand Down
30 changes: 21 additions & 9 deletions airbyte-cdk/python/airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
import logging
import logging.config
import traceback
Expand All @@ -11,8 +12,6 @@
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from deprecated import deprecated

TRACE_LEVEL_NUM = 5

LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
Expand All @@ -34,9 +33,8 @@

def init_logger(name: str = None):
"""Initial set up of logger"""
logging.addLevelName(TRACE_LEVEL_NUM, "TRACE")
logger = logging.getLogger(name)
logger.setLevel(TRACE_LEVEL_NUM)
logger.setLevel(logging.INFO)
logging.config.dictConfig(LOGGING_CONFIG)
return logger

Expand All @@ -51,16 +49,30 @@ class AirbyteLogFormatter(logging.Formatter):
logging.WARNING: "WARN",
logging.INFO: "INFO",
logging.DEBUG: "DEBUG",
TRACE_LEVEL_NUM: "TRACE",
}

def format(self, record: logging.LogRecord) -> str:
"""Return a JSON representation of the log message"""
message = super().format(record)
airbyte_level = self.level_mapping.get(record.levelno, "INFO")
message = filter_secrets(message)
log_message = AirbyteMessage(type="LOG", log=AirbyteLogMessage(level=airbyte_level, message=message))
return log_message.json(exclude_unset=True)
if airbyte_level == "DEBUG":
extras = self.extract_extra_args_from_record(record)
debug_dict = {"type": "DEBUG", "message": record.getMessage(), "data": extras}
return filter_secrets(json.dumps(debug_dict))
else:
message = super().format(record)
message = filter_secrets(message)
log_message = AirbyteMessage(type="LOG", log=AirbyteLogMessage(level=airbyte_level, message=message))
return log_message.json(exclude_unset=True)

@staticmethod
def extract_extra_args_from_record(record: logging.LogRecord):
"""
The python logger conflates default args with extra args. We use an empty log record and set operations
to isolate fields passed to the log record via extra by the developer.
"""
default_attrs = logging.LogRecord("", 0, "", 0, None, None, None).__dict__.keys()
extra_keys = set(record.__dict__.keys()) - default_attrs
return {k: str(getattr(record, k)) for k in extra_keys if hasattr(record, k)}


def log_by_prefix(msg: str, default_level: str) -> Tuple[int, str]:
Expand Down
32 changes: 27 additions & 5 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ def read(
f"The requested stream {configured_stream.stream.name} was not found in the source."
f" Available streams: {stream_instances.keys()}"
)

try:
timer.start_event(f"Syncing stream {configured_stream.stream.name}")
yield from self._read_stream(
Expand Down Expand Up @@ -142,11 +141,20 @@ def _read_stream(
connector_state: MutableMapping[str, Any],
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:

self._apply_log_level_to_stream_logger(logger, stream_instance)
if internal_config.page_size and isinstance(stream_instance, HttpStream):
logger.info(f"Setting page size for {stream_instance.name} to {internal_config.page_size}")
stream_instance.page_size = internal_config.page_size

logger.debug(
f"Syncing stream: {configured_stream.stream.name}",
extra={
"sync_mode": configured_stream.sync_mode,
"primary_key": configured_stream.primary_key,
"cursor_field": configured_stream.cursor_field,
},
)

use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
if use_incremental:
record_iterator = self._read_incremental(
Expand All @@ -157,7 +165,7 @@ def _read_stream(
internal_config,
)
else:
record_iterator = self._read_full_refresh(stream_instance, configured_stream, internal_config)
record_iterator = self._read_full_refresh(logger, stream_instance, configured_stream, internal_config)

record_counter = 0
stream_name = configured_stream.stream.name
Expand Down Expand Up @@ -210,8 +218,10 @@ def _read_incremental(
sync_mode=SyncMode.incremental,
stream_state=stream_state,
)
logger.debug(f"Processing stream slices for {stream_name}", extra={"stream_slices": slices})
total_records_counter = 0
for _slice in slices:
logger.debug("Processing stream slice", extra={"slice": _slice})
records = stream_instance.read_records(
sync_mode=SyncMode.incremental,
stream_slice=_slice,
Expand Down Expand Up @@ -239,15 +249,18 @@ def _read_incremental(

def _read_full_refresh(
self,
logger: logging.Logger,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)
logger.debug(f"Processing stream slices for {configured_stream.stream.name}", extra={"stream_slices": slices})
total_records_counter = 0
for slice in slices:
for _slice in slices:
logger.debug("Processing stream slice", extra={"slice": _slice})
records = stream_instance.read_records(
stream_slice=slice,
stream_slice=_slice,
sync_mode=SyncMode.full_refresh,
cursor_field=configured_stream.cursor_field,
)
Expand Down Expand Up @@ -287,3 +300,12 @@ def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]):
transformer.transform(data, schema) # type: ignore
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
return AirbyteMessage(type=MessageType.RECORD, record=message)

@staticmethod
def _apply_log_level_to_stream_logger(logger: logging.Logger, stream_instance: Stream):
"""
Necessary because we use different loggers at the source and stream levels. We must
apply the source's log level to each stream's logger.
"""
if hasattr(logger, "level"):
stream_instance.logger.setLevel(logger.level)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json
import logging
from typing import Any, List, Mapping

from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
Expand All @@ -13,6 +15,8 @@

class YamlDeclarativeSource(DeclarativeSource):
def __init__(self, path_to_yaml):
self.logger = logging.getLogger(f"airbyte.{self.name}")
self.logger.setLevel(logging.DEBUG)
self._factory = DeclarativeComponentFactory()
self._source_config = self._read_and_parse_yaml_file(path_to_yaml)

Expand All @@ -33,4 +37,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
def _read_and_parse_yaml_file(self, path_to_yaml_file):
with open(path_to_yaml_file, "r") as f:
config_content = f.read()
return YamlParser().parse(config_content)
parsed_config = YamlParser().parse(config_content)
self.logger.debug(
"parsed YAML into declarative source",
extra={"path_to_yaml_file": path_to_yaml_file, "source_name": self.name, "parsed_config": json.dumps(parsed_config)},
)
return parsed_config
5 changes: 4 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,11 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str,
Unexpected transient exceptions use the default backoff parameters.
Unexpected persistent exceptions are not handled and will cause the sync to fail.
"""
self.logger.debug(
"Making outbound API request", extra={"headers": request.headers, "url": request.url, "request_body": request.body}
)
response: requests.Response = self._session.send(request, **request_kwargs)

self.logger.debug("Receiving response", extra={"headers": response.headers, "status": response.status_code, "body": response.text})
if self.should_retry(response):
custom_backoff_time = self.backoff_time(response)
if custom_backoff_time:
Expand Down
14 changes: 8 additions & 6 deletions airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,11 @@ def test_raise_on_http_errors_off_5xx(mocker, status_code):
@pytest.mark.parametrize("status_code", [400, 401, 402, 403, 416])
def test_raise_on_http_errors_off_non_retryable_4xx(mocker, status_code):
stream = AutoFailFalseHttpStream()
req = requests.Response()
req.status_code = status_code
req = requests.PreparedRequest()
res = requests.Response()
res.status_code = status_code

mocker.patch.object(requests.Session, "send", return_value=req)
mocker.patch.object(requests.Session, "send", return_value=res)
response = stream._send_request(req, {})
assert response.status_code == status_code

Expand Down Expand Up @@ -437,9 +438,10 @@ def test_send_raise_on_http_errors_logs(mocker, status_code):
mocker.patch.object(AutoFailTrueHttpStream, "logger")
mocker.patch.object(AutoFailTrueHttpStream, "should_retry", mocker.Mock(return_value=False))
stream = AutoFailTrueHttpStream()
req = requests.Response()
req.status_code = status_code
mocker.patch.object(requests.Session, "send", return_value=req)
req = requests.PreparedRequest()
res = requests.Response()
res.status_code = status_code
mocker.patch.object(requests.Session, "send", return_value=res)
with pytest.raises(requests.exceptions.HTTPError):
response = stream._send_request(req, {})
stream.logger.error.assert_called_with(response.text)
Expand Down
13 changes: 9 additions & 4 deletions airbyte-cdk/python/unit_tests/sources/test_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ def test_internal_config(abstract_source, catalog):
non_http_stream.read_records.return_value = [{}] * 3

# Test with empty config
records = [r for r in abstract_source.read(logger=MagicMock(), config={}, catalog=catalog, state={})]
logger = logging.getLogger(f"airbyte.{getattr(abstract_source, 'name', '')}")
records = [r for r in abstract_source.read(logger=logger, config={}, catalog=catalog, state={})]
# 3 for http stream and 3 for non http stream
assert len(records) == 3 + 3
assert http_stream.read_records.called
Expand All @@ -145,19 +146,19 @@ def test_internal_config(abstract_source, catalog):
assert not non_http_stream.page_size
# Test with records limit set to 1
internal_config = {"some_config": 100, "_limit": 1}
records = [r for r in abstract_source.read(logger=MagicMock(), config=internal_config, catalog=catalog, state={})]
records = [r for r in abstract_source.read(logger=logger, config=internal_config, catalog=catalog, state={})]
# 1 from http stream + 1 from non http stream
assert len(records) == 1 + 1
assert "_limit" not in abstract_source.streams_config
assert "some_config" in abstract_source.streams_config
# Test with records limit set to number that exceeds expceted records
internal_config = {"some_config": 100, "_limit": 20}
records = [r for r in abstract_source.read(logger=MagicMock(), config=internal_config, catalog=catalog, state={})]
records = [r for r in abstract_source.read(logger=logger, config=internal_config, catalog=catalog, state={})]
assert len(records) == 3 + 3

# Check if page_size paramter is set to http instance only
internal_config = {"some_config": 100, "_page_size": 2}
records = [r for r in abstract_source.read(logger=MagicMock(), config=internal_config, catalog=catalog, state={})]
records = [r for r in abstract_source.read(logger=logger, config=internal_config, catalog=catalog, state={})]
assert "_page_size" not in abstract_source.streams_config
assert "some_config" in abstract_source.streams_config
assert len(records) == 3 + 3
Expand All @@ -168,6 +169,7 @@ def test_internal_config(abstract_source, catalog):

def test_internal_config_limit(abstract_source, catalog):
logger_mock = MagicMock()
logger_mock.level = logging.DEBUG
del catalog.streams[1]
STREAM_LIMIT = 2
FULL_RECORDS_NUMBER = 3
Expand Down Expand Up @@ -205,6 +207,7 @@ def test_internal_config_limit(abstract_source, catalog):

def test_source_config_no_transform(abstract_source, catalog):
logger_mock = MagicMock()
logger_mock.level = logging.DEBUG
streams = abstract_source.streams(None)
http_stream, non_http_stream = streams
http_stream.get_json_schema.return_value = non_http_stream.get_json_schema.return_value = SCHEMA
Expand All @@ -218,6 +221,7 @@ def test_source_config_no_transform(abstract_source, catalog):

def test_source_config_transform(abstract_source, catalog):
logger_mock = MagicMock()
logger_mock.level = logging.DEBUG
streams = abstract_source.streams(None)
http_stream, non_http_stream = streams
http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
Expand All @@ -231,6 +235,7 @@ def test_source_config_transform(abstract_source, catalog):

def test_source_config_transform_and_no_transform(abstract_source, catalog):
logger_mock = MagicMock()
logger_mock.level = logging.DEBUG
streams = abstract_source.streams(None)
http_stream, non_http_stream = streams
http_stream.transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
Expand Down
27 changes: 18 additions & 9 deletions airbyte-cdk/python/unit_tests/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def _as_arglist(cmd: str, named_args: Mapping[str, Any]) -> List[str]:
out = [cmd]
for k, v in named_args.items():
out.append(f"--{k}")
out.append(v)
if v:
out.append(v)
return out


Expand All @@ -56,19 +57,27 @@ def entrypoint() -> AirbyteEntrypoint:


@pytest.mark.parametrize(
["cmd", "args"],
["cmd", "args", "expected_args"],
[
("spec", dict()),
("check", {"config": "config_path"}),
("discover", {"config": "config_path"}),
("read", {"config": "config_path", "catalog": "catalog_path", "state": "None"}),
("read", {"config": "config_path", "catalog": "catalog_path", "state": "state_path"}),
("spec", {"debug": ""}, {"command": "spec", "debug": True}),
("check", {"config": "config_path"}, {"command": "check", "config": "config_path", "debug": False}),
("discover", {"config": "config_path", "debug": ""}, {"command": "discover", "config": "config_path", "debug": True}),
(
"read",
{"config": "config_path", "catalog": "catalog_path", "state": "None"},
{"command": "read", "config": "config_path", "catalog": "catalog_path", "state": "None", "debug": False},
),
(
"read",
{"config": "config_path", "catalog": "catalog_path", "state": "state_path", "debug": ""},
{"command": "read", "config": "config_path", "catalog": "catalog_path", "state": "state_path", "debug": True},
),
],
)
def test_parse_valid_args(cmd: str, args: Mapping[str, Any], entrypoint: AirbyteEntrypoint):
def test_parse_valid_args(cmd: str, args: Mapping[str, Any], expected_args, entrypoint: AirbyteEntrypoint):
arglist = _as_arglist(cmd, args)
parsed_args = entrypoint.parse_args(arglist)
assert {"command": cmd, **args} == vars(parsed_args)
assert vars(parsed_args) == expected_args


@pytest.mark.parametrize(
Expand Down
18 changes: 9 additions & 9 deletions airbyte-cdk/python/unit_tests/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


import json
import logging
from typing import Dict
Expand Down Expand Up @@ -50,20 +49,21 @@ def test_level_transform(logger, caplog):
assert level_critical == "FATAL"


def test_trace(logger, caplog):
logger.log(logging.getLevelName("TRACE"), "Test trace 1")
record = caplog.records[0]
assert record.levelname == "TRACE"
assert record.message == "Test trace 1"


def test_debug(logger, caplog):
logger.debug("Test debug 1")
# Test debug logger in isolation since the default logger is initialized to TRACE (15) instead of DEBUG (10).
debug_logger = logging.getLogger("airbyte.Debuglogger")
debug_logger.setLevel(logging.DEBUG)
debug_logger.debug("Test debug 1")
record = caplog.records[0]
assert record.levelname == "DEBUG"
assert record.message == "Test debug 1"


def test_default_debug_is_ignored(logger, caplog):
logger.debug("Test debug that is ignored since log level is TRACE")
assert len(caplog.records) == 0


def test_info(logger, caplog):
logger.info("Test info 1")
logger.info("Test info 2")
Expand Down
Loading

0 comments on commit 7bff12a

Please sign in to comment.