Skip to content

Commit

Permalink
fix: support incremental extraction with airbyte source defined curso…
Browse files Browse the repository at this point in the history
…r fields (#13)
  • Loading branch information
JichaoS authored Mar 7, 2024
1 parent fc42fbd commit dfff678
Showing 1 changed file with 6 additions and 0 deletions.
6 changes: 6 additions & 0 deletions tap_airbyte/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,12 @@ def discover_streams(self) -> List[Stream]:
# this is [str, ...?] in the Airbyte catalog
if "cursor_field" in stream and isinstance(stream["cursor_field"][0], str):
airbyte_stream.replication_key = stream["cursor_field"][0]
elif "source_defined_cursor" in stream and isinstance(stream["source_defined_cursor"], bool) and stream["source_defined_cursor"]:
# The stream has a source defined cursor. Try using that
if "default_cursor_field" in stream and isinstance(stream["default_cursor_field"][0], str):
airbyte_stream.replication_key = stream["default_cursor_field"][0]
else:
self.logger.warning(f"Stream {stream['name']} has a source defined cursor but no default_cursor_field.")
except IndexError:
pass
try:
Expand Down

0 comments on commit dfff678

Please sign in to comment.