Skip to content

Commit

Permalink
Handle logging non-JSON-serializable classes in stream slices (#22118)
Browse files Browse the repository at this point in the history
* Add failing test

* handle unserializable classes in stream slices

* format
  • Loading branch information
erohmensing committed Jan 31, 2023
1 parent 7f28751 commit b5d3236
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
6 changes: 4 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ def _read_incremental(
has_slices = True
if logger.isEnabledFor(logging.DEBUG):
yield AirbyteMessage(
type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice)}")
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
)
records = stream_instance.read_records(
sync_mode=SyncMode.incremental,
Expand Down Expand Up @@ -295,7 +296,8 @@ def _read_full_refresh(
for _slice in slices:
if logger.isEnabledFor(logging.DEBUG):
yield AirbyteMessage(
type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice)}")
type=MessageType.LOG,
log=AirbyteLogMessage(level=Level.INFO, message=f"{self.SLICE_LOG_PREFIX}{json.dumps(_slice, default=str)}"),
)
record_data_or_messages = stream_instance.read_records(
stream_slice=_slice,
Expand Down
11 changes: 9 additions & 2 deletions airbyte-cdk/python/unit_tests/sources/test_abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

import copy
import datetime
import logging
from collections import defaultdict
from typing import Any, Callable, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
Expand Down Expand Up @@ -331,11 +332,17 @@ def test_valid_full_refresh_read_with_slices(mocker):
assert expected == messages


def test_read_full_refresh_with_slices_sends_slice_messages(mocker):
@pytest.mark.parametrize(
"slices",
[
[{"1": "1"}, {"2": "2"}],
[{"date": datetime.date(year=2023, month=1, day=1)}, {"date": datetime.date(year=2023, month=1, day=1)}]
]
)
def test_read_full_refresh_with_slices_sends_slice_messages(mocker, slices):
"""Given the logger is debug and a full refresh, AirbyteMessages are sent for slices"""
debug_logger = logging.getLogger("airbyte.debug")
debug_logger.setLevel(logging.DEBUG)
slices = [{"1": "1"}, {"2": "2"}]
stream = MockStream(
[({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices],
name="s1",
Expand Down

0 comments on commit b5d3236

Please sign in to comment.