Skip to content

Commit

Permalink
File-based CDK: enqueue AirbyteMessage of type record instead of send…
Browse files Browse the repository at this point in the history
…ing to the message repository (#35318)
  • Loading branch information
clnoll authored and xiaohansong committed Feb 27, 2024
1 parent ec70172 commit 20eedff
Showing 1 changed file with 3 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ def read(self) -> Iterable[Record]:
data_to_return = dict(record_data)
self._stream.transformer.transform(data_to_return, self._stream.get_json_schema())
yield Record(data_to_return, self.stream_name())
elif isinstance(record_data, AirbyteMessage) and record_data.type == Type.RECORD:
# `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
yield Record(record_data.record.data, self.stream_name())
else:
self._message_repository.emit_message(record_data)
except Exception as e:
Expand Down

0 comments on commit 20eedff

Please sign in to comment.