From 49bbf7fac3ab692720ca994df0691d10a68ca75f Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 6 Mar 2025 17:22:07 -0800 Subject: [PATCH 1/8] handle airbyte records --- airbyte_cdk/sources/streams/concurrent/adapters.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/airbyte_cdk/sources/streams/concurrent/adapters.py b/airbyte_cdk/sources/streams/concurrent/adapters.py index f304bfb21..73ad900db 100644 --- a/airbyte_cdk/sources/streams/concurrent/adapters.py +++ b/airbyte_cdk/sources/streams/concurrent/adapters.py @@ -302,6 +302,15 @@ def read(self) -> Iterable[Record]: stream_name=self.stream_name(), associated_slice=self._slice, # type: ignore [arg-type] ) + elif ( + isinstance(record_data, AirbyteMessage) + and record_data.record is not None + ): + yield Record( + data=record_data.record.data, + stream_name=self.stream_name(), + associated_slice=self._slice, # type: ignore [arg-type] + ) else: self._message_repository.emit_message(record_data) except Exception as e: From 448074d82d244941c0b624f6b418a0996e4f6301 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 6 Mar 2025 17:25:39 -0800 Subject: [PATCH 2/8] type check --- airbyte_cdk/sources/streams/concurrent/adapters.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/streams/concurrent/adapters.py b/airbyte_cdk/sources/streams/concurrent/adapters.py index 73ad900db..81b1dc240 100644 --- a/airbyte_cdk/sources/streams/concurrent/adapters.py +++ b/airbyte_cdk/sources/streams/concurrent/adapters.py @@ -307,7 +307,7 @@ def read(self) -> Iterable[Record]: and record_data.record is not None ): yield Record( - data=record_data.record.data, + data=record_data.record.data or {}, stream_name=self.stream_name(), associated_slice=self._slice, # type: ignore [arg-type] ) From 01f1eed21c3df39f1d4b1189b3c31c5a1c55c6cf Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 6 Mar 2025 17:27:04 -0800 Subject: [PATCH 3/8] format --- airbyte_cdk/sources/streams/concurrent/adapters.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/streams/concurrent/adapters.py b/airbyte_cdk/sources/streams/concurrent/adapters.py index 81b1dc240..fb01174c7 100644 --- a/airbyte_cdk/sources/streams/concurrent/adapters.py +++ b/airbyte_cdk/sources/streams/concurrent/adapters.py @@ -302,10 +302,7 @@ def read(self) -> Iterable[Record]: stream_name=self.stream_name(), associated_slice=self._slice, # type: ignore [arg-type] ) - elif ( - isinstance(record_data, AirbyteMessage) - and record_data.record is not None - ): + elif isinstance(record_data, AirbyteMessage) and record_data.record is not None: yield Record( data=record_data.record.data or {}, stream_name=self.stream_name(), From 9abc5fd71fddd8784871038d1790448a7ea609d1 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 6 Mar 2025 17:47:38 -0800 Subject: [PATCH 4/8] Add a test --- .../streams/concurrent/test_adapters.py | 64 ++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/unit_tests/sources/streams/concurrent/test_adapters.py b/unit_tests/sources/streams/concurrent/test_adapters.py index f809fa38d..d4da664ef 100644 --- a/unit_tests/sources/streams/concurrent/test_adapters.py +++ b/unit_tests/sources/streams/concurrent/test_adapters.py @@ -7,7 +7,14 @@ import pytest -from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, Level, SyncMode +from airbyte_cdk.models import ( + AirbyteLogMessage, + AirbyteMessage, + AirbyteStream, + Level, + SyncMode, + AirbyteRecordMessage, +) from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.message import InMemoryMessageRepository from airbyte_cdk.sources.streams.concurrent.adapters import ( @@ -132,6 +139,61 @@ def test_stream_partition(transformer, expected_records): assert messages == [a_log_message] +@pytest.mark.parametrize( + "transformer, expected_records", + [ + pytest.param( + TypeTransformer(TransformConfig.NoTransform), + [Record({"data": "1"}, None), Record({"data": "2"}, None)], + id="test_no_transform", + ), + ], +) +def test_stream_partition_read_airbyte_message(transformer, expected_records): + stream = Mock() + stream.name = _STREAM_NAME + stream.get_json_schema.return_value = { + "type": "object", + "properties": {"data": {"type": ["integer"]}}, + } + stream.transformer = transformer + message_repository = InMemoryMessageRepository() + _slice = None + sync_mode = SyncMode.full_refresh + cursor_field = None + state = None + partition = StreamPartition(stream, _slice, message_repository, sync_mode, cursor_field, state) + + a_log_message = AirbyteMessage( + type=MessageType.LOG, + log=AirbyteLogMessage( + level=Level.INFO, + message='slice:{"partition": 1}', + ), + ) + for record in expected_records: + record.partition = partition + + stream_data = [ + a_log_message, + AirbyteMessage( + type=MessageType.RECORD, + record=AirbyteRecordMessage(stream=stream.name, data={"data": "1"}), + ), + AirbyteMessage( + type=MessageType.RECORD, + record=AirbyteRecordMessage(stream=stream.name, data={"data": "2"}), + ), + ] + stream.read_records.return_value = stream_data + + records = list(partition.read()) + messages = list(message_repository.consume_queue()) + + assert records == expected_records + assert messages == [a_log_message] + + @pytest.mark.parametrize( "exception_type, expected_display_message", [ From 5115d872227960a916e69536d35d39fcb76fabda Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 6 Mar 2025 17:49:49 -0800 Subject: [PATCH 5/8] ruff --- unit_tests/sources/streams/concurrent/test_adapters.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unit_tests/sources/streams/concurrent/test_adapters.py b/unit_tests/sources/streams/concurrent/test_adapters.py index d4da664ef..d6b74ce43 100644 --- a/unit_tests/sources/streams/concurrent/test_adapters.py +++ b/unit_tests/sources/streams/concurrent/test_adapters.py @@ -10,10 +10,10 @@ from airbyte_cdk.models import ( AirbyteLogMessage, AirbyteMessage, + AirbyteRecordMessage, AirbyteStream, Level, SyncMode, - AirbyteRecordMessage, ) from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.message import InMemoryMessageRepository From f168624233b0b50343ad73eeda71a427244b67d3 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Thu, 6 Mar 2025 17:56:00 -0800 Subject: [PATCH 6/8] emitted at --- unit_tests/sources/streams/concurrent/test_adapters.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unit_tests/sources/streams/concurrent/test_adapters.py b/unit_tests/sources/streams/concurrent/test_adapters.py index d6b74ce43..66f48a9e0 100644 --- a/unit_tests/sources/streams/concurrent/test_adapters.py +++ b/unit_tests/sources/streams/concurrent/test_adapters.py @@ -178,11 +178,11 @@ def test_stream_partition_read_airbyte_message(transformer, expected_records): a_log_message, AirbyteMessage( type=MessageType.RECORD, - record=AirbyteRecordMessage(stream=stream.name, data={"data": "1"}), + record=AirbyteRecordMessage(stream=stream.name, data={"data": "1"}, emitted_at=1), ), AirbyteMessage( type=MessageType.RECORD, - record=AirbyteRecordMessage(stream=stream.name, data={"data": "2"}), + record=AirbyteRecordMessage(stream=stream.name, data={"data": "2"}, emitted_at=2), ), ] stream.read_records.return_value = stream_data From 12f5cc7baf39317c8936dcf2dff5ec0606f553fe Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Fri, 7 Mar 2025 09:48:53 -0800 Subject: [PATCH 7/8] update docstring --- airbyte_cdk/sources/streams/concurrent/adapters.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/streams/concurrent/adapters.py b/airbyte_cdk/sources/streams/concurrent/adapters.py index fb01174c7..a3209d4ec 100644 --- a/airbyte_cdk/sources/streams/concurrent/adapters.py +++ b/airbyte_cdk/sources/streams/concurrent/adapters.py @@ -276,7 +276,7 @@ def __init__( def read(self) -> Iterable[Record]: """ Read messages from the stream. - If the StreamData is a Mapping, it will be converted to a Record. + If the StreamData is a Mapping or an AirbyteMessage of type RECORD, it will be converted to a Record. Otherwise, the message will be emitted on the message repository. """ try: From 65d3c4f43f817103c3941020037af2f0858366f7 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Fri, 7 Mar 2025 09:59:54 -0800 Subject: [PATCH 8/8] Add a comment --- airbyte_cdk/sources/streams/concurrent/adapters.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airbyte_cdk/sources/streams/concurrent/adapters.py b/airbyte_cdk/sources/streams/concurrent/adapters.py index a3209d4ec..7da594155 100644 --- a/airbyte_cdk/sources/streams/concurrent/adapters.py +++ b/airbyte_cdk/sources/streams/concurrent/adapters.py @@ -292,6 +292,8 @@ def read(self) -> Iterable[Record]: stream_slice=copy.deepcopy(self._slice), stream_state=self._state, ): + # Noting we'll also need to support FileTransferRecordMessage if we want to support file-based connectors in this facade + # For now, file-based connectors have their own stream facade if isinstance(record_data, Mapping): data_to_return = dict(record_data) self._stream.transformer.transform(