From fb11ca22fe3434947beddbc89eb733172b173195 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Tue, 14 May 2024 18:00:03 -0700 Subject: [PATCH] low-code: Yield records from generators instead of keeping them in in-memory lists (#36406) --- .../declarative/extractors/dpath_extractor.py | 17 +++---- .../declarative/extractors/http_selector.py | 4 +- .../extractors/record_extractor.py | 5 +- .../declarative/extractors/record_filter.py | 10 ++-- .../declarative/extractors/record_selector.py | 39 ++++++++++------ .../models/declarative_component_schema.py | 6 ++- .../paginators/default_paginator.py | 14 ++++-- .../requesters/paginators/no_pagination.py | 4 +- .../requesters/paginators/paginator.py | 9 ++-- .../strategies/cursor_pagination_strategy.py | 17 +++---- .../paginators/strategies/offset_increment.py | 8 ++-- .../paginators/strategies/page_increment.py | 6 +-- .../strategies/pagination_strategy.py | 7 +-- .../paginators/strategies/stop_condition.py | 8 ++-- .../retrievers/simple_retriever.py | 15 +++--- .../extractors/test_dpath_extractor.py | 2 +- .../extractors/test_record_filter.py | 4 +- .../extractors/test_record_selector.py | 8 ++-- .../test_model_to_component_factory.py | 6 +-- .../test_cursor_pagination_strategy.py | 13 +++--- .../paginators/test_default_paginator.py | 28 +++++------ .../paginators/test_no_paginator.py | 2 +- .../paginators/test_offset_increment.py | 20 ++++---- .../paginators/test_page_increment.py | 24 +++++----- .../paginators/test_stop_condition.py | 22 ++++----- .../retrievers/test_simple_retriever.py | 46 ++----------------- 26 files changed, 161 insertions(+), 183 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py index 5a984781bb4e3..cfbe271364517 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py @@ -3,7 +3,7 @@ # from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, Union +from typing import Any, Iterable, List, Mapping, Union import dpath.util import requests @@ -59,23 +59,24 @@ class DpathExtractor(RecordExtractor): decoder: Decoder = JsonDecoder(parameters={}) def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._field_path = [InterpolatedString.create(path, parameters=parameters) for path in self.field_path] for path_index in range(len(self.field_path)): if isinstance(self.field_path[path_index], str): - self.field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters) + self._field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters) - def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]: + def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]: response_body = self.decoder.decode(response) - if len(self.field_path) == 0: + if len(self._field_path) == 0: extracted = response_body else: - path = [path.eval(self.config) for path in self.field_path] # type: ignore # field_path elements are always cast to interpolated string + path = [path.eval(self.config) for path in self._field_path] if "*" in path: extracted = dpath.util.values(response_body, path) else: extracted = dpath.util.get(response_body, path, default=[]) if isinstance(extracted, list): - return extracted + yield from extracted elif extracted: - return [extracted] + yield extracted else: - return [] + yield from [] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py index 17bef8bb6371f..e70ac150564cc 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/http_selector.py @@ -4,7 +4,7 @@ from abc import abstractmethod from dataclasses import dataclass -from typing import Any, List, Mapping, Optional +from typing import Any, Iterable, Mapping, Optional import requests from airbyte_cdk.sources.types import Record, StreamSlice, StreamState @@ -25,7 +25,7 @@ def select_records( records_schema: Mapping[str, Any], stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, - ) -> List[Record]: + ) -> Iterable[Record]: """ Selects records from the response :param response: The response to select the records from diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_extractor.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_extractor.py index 792499c6dac61..5de6a84a7db7e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_extractor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_extractor.py @@ -1,10 +1,9 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - from abc import abstractmethod from dataclasses import dataclass -from typing import Any, List, Mapping +from typing import Any, Iterable, Mapping import requests @@ -19,7 +18,7 @@ class RecordExtractor: def extract_records( self, response: requests.Response, - ) -> List[Mapping[str, Any]]: + ) -> Iterable[Mapping[str, Any]]: """ Selects records from the response :param response: The response to extract the records from diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py index 9c1c1cb5696ce..78e55408a07a9 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_filter.py @@ -3,7 +3,7 @@ # from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, Optional +from typing import Any, Iterable, Mapping, Optional from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean from airbyte_cdk.sources.types import Config, StreamSlice, StreamState @@ -27,10 +27,12 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def filter_records( self, - records: List[Mapping[str, Any]], + records: Iterable[Mapping[str, Any]], stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, - ) -> List[Mapping[str, Any]]: + ) -> Iterable[Mapping[str, Any]]: kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token} - return [record for record in records if self._filter_interpolator.eval(self.config, record=record, **kwargs)] + for record in records: + if self._filter_interpolator.eval(self.config, record=record, **kwargs): + yield record diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py index 8dca611b4823c..6f9cc40478383 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -3,7 +3,7 @@ # from dataclasses import InitVar, dataclass, field -from typing import Any, List, Mapping, Optional +from typing import Any, Iterable, List, Mapping, Optional import requests from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector @@ -50,7 +50,7 @@ def select_records( records_schema: Mapping[str, Any], stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, - ) -> List[Record]: + ) -> Iterable[Record]: """ Selects records from the response :param response: The response to select the records from @@ -60,38 +60,47 @@ def select_records( :param next_page_token: The paginator token :return: List of Records selected from the response """ - all_data = self.extractor.extract_records(response) + all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response) filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token) - self._transform(filtered_data, stream_state, stream_slice) - self._normalize_by_schema(filtered_data, schema=records_schema) - return [Record(data, stream_slice) for data in filtered_data] + transformed_data = self._transform(filtered_data, stream_state, stream_slice) + normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema) + for data in normalized_data: + yield Record(data, stream_slice) - def _normalize_by_schema(self, records: List[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]) -> List[Mapping[str, Any]]: + def _normalize_by_schema( + self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]] + ) -> Iterable[Mapping[str, Any]]: if schema: # record has type Mapping[str, Any], but dict[str, Any] expected - return [self.schema_normalization.transform(record, schema) for record in records] # type: ignore - return records + for record in records: + normalized_record = dict(record) + self.schema_normalization.transform(normalized_record, schema) + yield normalized_record + else: + yield from records def _filter( self, - records: List[Mapping[str, Any]], + records: Iterable[Mapping[str, Any]], stream_state: StreamState, stream_slice: Optional[StreamSlice], next_page_token: Optional[Mapping[str, Any]], - ) -> List[Mapping[str, Any]]: + ) -> Iterable[Mapping[str, Any]]: if self.record_filter: - return self.record_filter.filter_records( + yield from self.record_filter.filter_records( records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token ) - return records + else: + yield from records def _transform( self, - records: List[Mapping[str, Any]], + records: Iterable[Mapping[str, Any]], stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, - ) -> None: + ) -> Iterable[Mapping[str, Any]]: for record in records: for transformation in self.transformations: # record has type Mapping[str, Any], but Record expected transformation.transform(record, config=self.config, stream_state=stream_state, stream_slice=stream_slice) # type: ignore + yield record diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 69184a32ef236..7302ba28e135f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -874,7 +874,11 @@ class CursorPagination(BaseModel): cursor_value: str = Field( ..., description='Value of the cursor defining the next page to fetch.', - examples=['{{ headers.link.next.cursor }}', "{{ last_record['key'] }}", "{{ response['nextPage'] }}"], + examples=[ + '{{ headers.link.next.cursor }}', + "{{ last_record['key'] }}", + "{{ response['nextPage'] }}", + ], title='Cursor Value', ) page_size: Optional[int] = Field(None, description='The number of records to include in each pages.', examples=[100], title='Page Size') diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py index 0919d680c0476..ccfd91a1910fd 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/default_paginator.py @@ -3,7 +3,7 @@ # from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, MutableMapping, Optional, Union +from typing import Any, Mapping, MutableMapping, Optional, Union import requests from airbyte_cdk.sources.declarative.decoders.decoder import Decoder @@ -101,8 +101,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.url_base = InterpolatedString(string=self.url_base, parameters=parameters) self._token = self.pagination_strategy.initial_token - def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Mapping[str, Any]]: - self._token = self.pagination_strategy.next_page_token(response, last_records) + def next_page_token( + self, response: requests.Response, last_page_size: int, last_record: Optional[Record] + ) -> Optional[Mapping[str, Any]]: + self._token = self.pagination_strategy.next_page_token(response, last_page_size, last_record) if self._token: return {"next_page_token": self._token} else: @@ -185,12 +187,14 @@ def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> No self._decorated = decorated self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL - def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Mapping[str, Any]]: + def next_page_token( + self, response: requests.Response, last_page_size: int, last_record: Optional[Record] + ) -> Optional[Mapping[str, Any]]: if self._page_count >= self._maximum_number_of_pages: return None self._page_count += 1 - return self._decorated.next_page_token(response, last_records) + return self._decorated.next_page_token(response, last_page_size, last_record) def path(self) -> Optional[str]: return self._decorated.path() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py index 184a42bdfa2d1..59fbcf6fcccf7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py @@ -3,7 +3,7 @@ # from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, MutableMapping, Optional, Union +from typing import Any, Mapping, MutableMapping, Optional, Union import requests from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator @@ -57,7 +57,7 @@ def get_request_body_json( ) -> Mapping[str, Any]: return {} - def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Mapping[str, Any]: + def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Mapping[str, Any]: return {} def reset(self) -> None: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py index 18c1f367201fd..32752806f1dd3 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py @@ -4,7 +4,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any, List, Mapping, Optional +from typing import Any, Mapping, Optional import requests from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider @@ -27,12 +27,15 @@ def reset(self) -> None: """ @abstractmethod - def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Mapping[str, Any]]: + def next_page_token( + self, response: requests.Response, last_page_size: int, last_record: Optional[Record] + ) -> Optional[Mapping[str, Any]]: """ Returns the next_page_token to use to fetch the next page of records. :param response: the response to process - :param last_records: the records extracted from the response + :param last_page_size: the number of records read from the response + :param last_record: the last record extracted from the response :return: A mapping {"next_page_token": } for the next page from the input response object. Returning None means there are no more pages to read in this response. """ pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py index 584e39b700255..ac9143322cfbe 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py @@ -3,7 +3,7 @@ # from dataclasses import InitVar, dataclass -from typing import Any, Dict, List, Mapping, Optional, Union +from typing import Any, Dict, Mapping, Optional, Union import requests from airbyte_cdk.sources.declarative.decoders.decoder import Decoder @@ -40,42 +40,37 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: else: self._cursor_value = self.cursor_value if isinstance(self.stop_condition, str): - self._stop_condition = InterpolatedBoolean(condition=self.stop_condition, parameters=parameters) + self._stop_condition: Optional[InterpolatedBoolean] = InterpolatedBoolean(condition=self.stop_condition, parameters=parameters) else: - self._stop_condition = self.stop_condition # type: ignore # the type has been checked + self._stop_condition = self.stop_condition @property def initial_token(self) -> Optional[Any]: return None - def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]: + def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]: decoded_response = self.decoder.decode(response) # The default way that link is presented in requests.Response is a string of various links (last, next, etc). This # is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links headers: Dict[str, Any] = dict(response.headers) headers["link"] = response.links - - last_record = last_records[-1] if last_records else None - if self._stop_condition: should_stop = self._stop_condition.eval( self.config, response=decoded_response, headers=headers, - last_records=last_records, last_record=last_record, - last_page_size=len(last_records), + last_page_size=last_page_size, ) if should_stop: return None token = self._cursor_value.eval( config=self.config, - last_records=last_records, response=decoded_response, headers=headers, last_record=last_record, - last_page_size=len(last_records), + last_page_size=last_page_size, ) return token if token else None diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py index e358c40a93eb1..806df63e8f245 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/offset_increment.py @@ -3,7 +3,7 @@ # from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, Optional, Union +from typing import Any, Mapping, Optional, Union import requests from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder @@ -56,14 +56,14 @@ def initial_token(self) -> Optional[Any]: return self._offset return None - def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]: + def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]: decoded_response = self.decoder.decode(response) # Stop paginating when there are fewer records than the page size or the current page has no records - if (self._page_size and len(last_records) < self._page_size.eval(self.config, response=decoded_response)) or len(last_records) == 0: + if (self._page_size and last_page_size < self._page_size.eval(self.config, response=decoded_response)) or last_page_size == 0: return None else: - self._offset += len(last_records) + self._offset += last_page_size return self._offset def reset(self) -> None: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py index d6505ca2850da..e56f86f4df4cf 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/page_increment.py @@ -3,7 +3,7 @@ # from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, Optional, Union +from typing import Any, Mapping, Optional, Union import requests from airbyte_cdk.sources.declarative.interpolation import InterpolatedString @@ -43,9 +43,9 @@ def initial_token(self) -> Optional[Any]: return self._page return None - def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]: + def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]: # Stop paginating when there are fewer records than the page size or the current page has no records - if (self._page_size and len(last_records) < self._page_size) or len(last_records) == 0: + if (self._page_size and last_page_size < self._page_size) or last_page_size == 0: return None else: self._page += 1 diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py index 1ec5c4ebd1560..78232adfb4fba 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/pagination_strategy.py @@ -4,7 +4,7 @@ from abc import abstractmethod from dataclasses import dataclass -from typing import Any, List, Optional +from typing import Any, Optional import requests from airbyte_cdk.sources.types import Record @@ -24,10 +24,11 @@ def initial_token(self) -> Optional[Any]: """ @abstractmethod - def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]: + def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]: """ :param response: response to process - :param last_records: records extracted from the response + :param last_page_size: the number of records read from the response + :param last_record: the last record extracted from the response :return: next page token. Returns None if there are no more pages to fetch """ pass diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py index bcce2d0c0a1c0..ca79bfd39ac7b 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/strategies/stop_condition.py @@ -3,7 +3,7 @@ # from abc import ABC, abstractmethod -from typing import Any, List, Optional +from typing import Any, Optional import requests from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor @@ -35,12 +35,12 @@ def __init__(self, _delegate: PaginationStrategy, stop_condition: PaginationStop self._delegate = _delegate self._stop_condition = stop_condition - def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]: + def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]: # We evaluate in reverse order because the assumption is that most of the APIs using data feed structure will return records in # descending order. In terms of performance/memory, we return the records lazily - if last_records and any(self._stop_condition.is_met(record) for record in reversed(last_records)): + if last_record and self._stop_condition.is_met(last_record): return None - return self._delegate.next_page_token(response, last_records) + return self._delegate.next_page_token(response, last_page_size, last_record) def reset(self) -> None: self._delegate.reset() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 97cd07f9ff30c..ef1eb349b0760 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -64,7 +64,8 @@ class SimpleRetriever(Retriever): def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._paginator = self.paginator or NoPagination(parameters=parameters) self._last_response: Optional[requests.Response] = None - self._records_from_last_response: List[Record] = [] + self._last_page_size: int = 0 + self._last_record: Optional[Record] = None self._parameters = parameters self._name = InterpolatedString(self._name, parameters=parameters) if isinstance(self._name, str) else self._name @@ -223,19 +224,21 @@ def _parse_response( ) -> Iterable[Record]: if not response: self._last_response = None - self._records_from_last_response = [] return [] self._last_response = response - records = self.record_selector.select_records( + record_generator = self.record_selector.select_records( response=response, stream_state=stream_state, records_schema=records_schema, stream_slice=stream_slice, next_page_token=next_page_token, ) - self._records_from_last_response = records - return records + self._last_page_size = 0 + for record in record_generator: + self._last_page_size += 1 + self._last_record = record + yield record @property # type: ignore def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: @@ -255,7 +258,7 @@ def _next_page_token(self, response: requests.Response) -> Optional[Mapping[str, :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response. """ - return self._paginator.next_page_token(response, self._records_from_last_response) + return self._paginator.next_page_token(response, self._last_page_size, self._last_record) def _fetch_next_page( self, stream_state: Mapping[str, Any], stream_slice: StreamSlice, next_page_token: Optional[Mapping[str, Any]] = None diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index 76fad421ea766..948fb19c921ae 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -44,7 +44,7 @@ def test_dpath_extractor(test_name, field_path, body, expected_records): extractor = DpathExtractor(field_path=field_path, config=config, decoder=decoder, parameters=parameters) response = create_response(body) - actual_records = extractor.extract_records(response) + actual_records = list(extractor.extract_records(response)) assert actual_records == expected_records diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_filter.py b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_filter.py index 89d003b776527..2104e4243120a 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_filter.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_filter.py @@ -49,7 +49,7 @@ def test_record_filter(test_name, filter_template, records, expected_records): next_page_token = {"last_seen_id": 14} record_filter = RecordFilter(config=config, condition=filter_template, parameters=parameters) - actual_records = record_filter.filter_records( + actual_records = list(record_filter.filter_records( records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token - ) + )) assert actual_records == expected_records diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_selector.py b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_selector.py index e85f80fc8b79b..9620d9692277b 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_selector.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/extractors/test_record_selector.py @@ -90,9 +90,9 @@ def test_record_filter(test_name, field_path, filter_template, body, expected_da schema_normalization=TypeTransformer(TransformConfig.NoTransform), ) - actual_records = record_selector.select_records( + actual_records = list(record_selector.select_records( response=response, records_schema=schema, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token - ) + )) assert actual_records == [Record(data, stream_slice) for data in expected_data] calls = [] @@ -149,13 +149,13 @@ def test_schema_normalization(test_name, schema, schema_transformation, body, ex schema_normalization=TypeTransformer(schema_transformation), ) - actual_records = record_selector.select_records( + actual_records = list(record_selector.select_records( response=response, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token, records_schema=schema, - ) + )) assert actual_records == [Record(data, stream_slice) for data in expected_data] diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 1d154631a21f7..6306a854ec6e5 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -229,7 +229,7 @@ def test_full_config_stream(): assert isinstance(stream.retriever.record_selector.extractor, DpathExtractor) assert isinstance(stream.retriever.record_selector.extractor.decoder, JsonDecoder) - assert [fp.eval(input_config) for fp in stream.retriever.record_selector.extractor.field_path] == ["lists"] + assert [fp.eval(input_config) for fp in stream.retriever.record_selector.extractor._field_path] == ["lists"] assert isinstance(stream.retriever.record_selector.record_filter, RecordFilter) assert stream.retriever.record_selector.record_filter._filter_interpolator.condition == "{{ record['id'] > stream_state['id'] }}" @@ -760,7 +760,7 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel assert isinstance(selector, RecordSelector) assert isinstance(selector.extractor, DpathExtractor) - assert [fp.eval(input_config) for fp in selector.extractor.field_path] == [expected_runtime_selector] + assert [fp.eval(input_config) for fp in selector.extractor._field_path] == [expected_runtime_selector] assert isinstance(selector.record_filter, RecordFilter) assert selector.record_filter.condition == "{{ record['id'] > stream_state['id'] }}" @@ -1094,7 +1094,7 @@ def test_config_with_defaults(): assert isinstance(stream.retriever.record_selector, RecordSelector) assert isinstance(stream.retriever.record_selector.extractor, DpathExtractor) - assert [fp.eval(input_config) for fp in stream.retriever.record_selector.extractor.field_path] == ["result"] + assert [fp.eval(input_config) for fp in stream.retriever.record_selector.extractor._field_path] == ["result"] assert isinstance(stream.retriever.paginator, DefaultPaginator) assert stream.retriever.paginator.url_base.string == "https://api.sendgrid.com" diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py index ea379822b004d..dd7a9d2572ab8 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_cursor_pagination_strategy.py @@ -17,7 +17,7 @@ ("test_static_token", "token", None, "token", None), ("test_static_token_with_page_size", "token", None, "token", 5), ("test_token_from_config", "{{ config.config_key }}", None, "config_value", None), - ("test_token_from_last_record", "{{ last_records[-1].id }}", None, 1, None), + ("test_token_from_last_record", "{{ last_record.id }}", None, 1, None), ("test_token_from_response", "{{ response._metadata.content }}", None, "content_value", None), ("test_token_from_parameters", "{{ parameters.key }}", None, "value", None), ("test_token_not_found", "{{ response.invalid_key }}", None, None, None), @@ -58,10 +58,10 @@ def test_cursor_pagination_strategy(test_name, template_string, stop_condition, response.headers = {"has_more": True, "next": "ready_to_go", "link": link_str} response_body = {"_metadata": {"content": "content_value"}, "accounts": [], "end": 99, "total": 200, "characters": {}} response._content = json.dumps(response_body).encode("utf-8") - last_records = [{"id": 0, "more_records": True}, {"id": 1, "more_records": True}] + last_record = {"id": 1, "more_records": True} - token = strategy.next_page_token(response, last_records) - assert token == expected_token + token = strategy.next_page_token(response, 1, last_record) + assert expected_token == token assert page_size == strategy.get_page_size() @@ -75,12 +75,11 @@ def test_last_record_points_to_the_last_item_in_last_records_array(): ) response = requests.Response() - next_page_token = strategy.next_page_token(response, last_records) + next_page_token = strategy.next_page_token(response, 2, last_records[-1]) assert next_page_token == 1 def test_last_record_is_node_if_no_records(): - last_records = [] strategy = CursorPaginationStrategy( page_size=1, cursor_value="{{ last_record.id }}", @@ -89,5 +88,5 @@ def test_last_record_is_node_if_no_records(): ) response = requests.Response() - next_page_token = strategy.next_page_token(response, last_records) + next_page_token = strategy.next_page_token(response, 0, None) assert next_page_token is None diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py index d8326d5227ecb..923083342808a 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_default_paginator.py @@ -21,7 +21,7 @@ @pytest.mark.parametrize( - "page_token_request_option, stop_condition, expected_updated_path, expected_request_params, expected_headers, expected_body_data, expected_body_json, last_records, expected_next_page_token, limit", + "page_token_request_option, stop_condition, expected_updated_path, expected_request_params, expected_headers, expected_body_data, expected_body_json, last_record, expected_next_page_token, limit", [ ( RequestPath(parameters={}), @@ -31,7 +31,7 @@ {}, {}, {}, - [{"id": 0}, {"id": 1}], + {"id": 1}, {"next_page_token": "https://airbyte.io/next_url"}, 2, ), @@ -43,7 +43,7 @@ {}, {}, {}, - [{"id": 0}, {"id": 1}], + {"id": 1}, {"next_page_token": "https://airbyte.io/next_url"}, 2, ), @@ -55,7 +55,7 @@ {}, {}, {}, - [{"id": 0}, {"id": 1}], + {"id": 1}, None, 2, ), @@ -67,7 +67,7 @@ {"from": "https://airbyte.io/next_url"}, {}, {}, - [{"id": 0}, {"id": 1}], + {"id": 1}, {"next_page_token": "https://airbyte.io/next_url"}, 2, ), @@ -79,7 +79,7 @@ {}, {"from": "https://airbyte.io/next_url"}, {}, - [{"id": 0}, {"id": 1}], + {"id": 1}, {"next_page_token": "https://airbyte.io/next_url"}, 2, ), @@ -91,7 +91,7 @@ {}, {}, {"from": "https://airbyte.io/next_url"}, - [{"id": 0}, {"id": 1}], + {"id": 1}, {"next_page_token": "https://airbyte.io/next_url"}, 2, ), @@ -113,7 +113,7 @@ def test_default_paginator_with_cursor( expected_headers, expected_body_data, expected_body_json, - last_records, + last_record, expected_next_page_token, limit, ): @@ -146,7 +146,7 @@ def test_default_paginator_with_cursor( response_body = {"next": "https://airbyte.io/next_url"} response._content = json.dumps(response_body).encode("utf-8") - actual_next_page_token = paginator.next_page_token(response, last_records) + actual_next_page_token = paginator.next_page_token(response, 2, last_record) actual_next_path = paginator.path() actual_request_params = paginator.get_request_params() actual_headers = paginator.get_request_headers() @@ -210,8 +210,8 @@ def test_paginator_request_param_interpolation( response.headers = {"A_HEADER": "HEADER_VALUE"} response_body = {"next": "https://airbyte.io/next_url"} response._content = json.dumps(response_body).encode("utf-8") - last_records = [{"id": 0}, {"id": 1}] - paginator.next_page_token(response, last_records) + last_record = {"id": 1} + paginator.next_page_token(response, 2, last_record) actual_request_params = paginator.get_request_params() assert actual_request_params == expected_request_params @@ -255,7 +255,7 @@ def test_reset(test_name, inject_on_first_request): strategy, config, url_base, parameters={}, page_size_option=page_size_request_option, page_token_option=page_token_request_option ) initial_request_parameters = paginator.get_request_params() - paginator.next_page_token(MagicMock(), [{"first key": "first value"}, {"second key": "second value"}]) + paginator.next_page_token(MagicMock(), 2, {"a key": "a value"}) request_parameters_for_second_request = paginator.get_request_params() paginator.reset() request_parameters_after_reset = paginator.get_request_params() @@ -293,10 +293,10 @@ def test_limit_page_fetched(): ) for _ in range(number_of_next_performed): - last_token = paginator.next_page_token(MagicMock(), MagicMock()) + last_token = paginator.next_page_token(MagicMock(), 1, MagicMock()) assert last_token - assert not paginator.next_page_token(MagicMock(), MagicMock()) + assert not paginator.next_page_token(MagicMock(), 1, MagicMock()) def test_paginator_with_page_option_no_page_size(): diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py index 33910f54499c3..12b81010f43b1 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_no_paginator.py @@ -8,5 +8,5 @@ def test(): paginator = NoPagination(parameters={}) - next_page_token = paginator.next_page_token(requests.Response(), []) + next_page_token = paginator.next_page_token(requests.Response(), 0, []) assert next_page_token == {} diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py index 88e6cda7f15bd..326de7b7d197f 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_offset_increment.py @@ -11,16 +11,16 @@ @pytest.mark.parametrize( - "page_size, parameters, last_records, expected_next_page_token, expected_offset", + "page_size, parameters, last_page_size, last_record, expected_next_page_token, expected_offset", [ - pytest.param("2", {}, [{"id": 0}, {"id": 1}], 2, 2, id="test_same_page_size"), - pytest.param(2, {}, [{"id": 0}, {"id": 1}], 2, 2, id="test_same_page_size"), - pytest.param("{{ parameters['page_size'] }}", {"page_size": 3}, [{"id": 0}, {"id": 1}], None, 0, id="test_larger_page_size"), - pytest.param(None, {}, [], None, 0, id="test_stop_if_no_records"), - pytest.param("{{ response['page_metadata']['limit'] }}", {}, [{"id": 0}, {"id": 1}], None, 0, id="test_page_size_from_response"), + pytest.param("2", {}, 2, {"id": 1}, 2, 2, id="test_same_page_size"), + pytest.param(2, {}, 2, {"id": 1}, 2, 2, id="test_same_page_size"), + pytest.param("{{ parameters['page_size'] }}", {"page_size": 3}, 2, {"id": 1}, None, 0, id="test_larger_page_size"), + pytest.param(None, {}, 0, [], None, 0, id="test_stop_if_no_records"), + pytest.param("{{ response['page_metadata']['limit'] }}", {}, 2, {"id": 1}, None, 0, id="test_page_size_from_response"), ], ) -def test_offset_increment_paginator_strategy(page_size, parameters, last_records, expected_next_page_token, expected_offset): +def test_offset_increment_paginator_strategy(page_size, parameters, last_page_size, last_record, expected_next_page_token, expected_offset): paginator_strategy = OffsetIncrement(page_size=page_size, parameters=parameters, config={}) assert paginator_strategy._offset == 0 @@ -30,9 +30,9 @@ def test_offset_increment_paginator_strategy(page_size, parameters, last_records response_body = {"next": "https://airbyte.io/next_url", "page_metadata": {"limit": 5}} response._content = json.dumps(response_body).encode("utf-8") - next_page_token = paginator_strategy.next_page_token(response, last_records) - assert next_page_token == expected_next_page_token - assert paginator_strategy._offset == expected_offset + next_page_token = paginator_strategy.next_page_token(response, last_page_size, last_record) + assert expected_next_page_token == next_page_token + assert expected_offset == paginator_strategy._offset paginator_strategy.reset() assert 0 == paginator_strategy._offset diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py index f108dcfe8301f..eaa2b41ec07fe 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_page_increment.py @@ -11,18 +11,18 @@ @pytest.mark.parametrize( - "page_size, start_from, last_records, expected_next_page_token, expected_offset", + "page_size, start_from, last_page_size, last_record, expected_next_page_token, expected_offset", [ - pytest.param(2, 1, [{"id": 0}, {"id": 1}], 2, 2, id="test_same_page_size_start_from_0"), - pytest.param(3, 1, [{"id": 0}, {"id": 1}], None, 1, id="test_larger_page_size_start_from_0"), - pytest.param(2, 0, [{"id": 0}, {"id": 1}], 1, 1, id="test_same_page_size_start_from_1"), - pytest.param(3, 0, [{"id": 0}, {"id": 1}], None, 0, id="test_larger_page_size_start_from_0"), - pytest.param(None, 0, [], None, 0, id="test_no_page_size"), - pytest.param("2", 0, [{"id": 0}, {"id": 1}], 1, 1, id="test_page_size_from_string"), - pytest.param("{{ config['value'] }}", 0, [{"id": 0}, {"id": 1}], 1, 1, id="test_page_size_from_config"), + pytest.param(2, 1, 2, {"id": 1}, 2, 2, id="test_same_page_size_start_from_0"), + pytest.param(3, 1, 2, {"id": 1}, None, 1, id="test_larger_page_size_start_from_0"), + pytest.param(2, 0, 2, {"id": 1}, 1, 1, id="test_same_page_size_start_from_1"), + pytest.param(3, 0, 2, {"id": 1}, None, 0, id="test_larger_page_size_start_from_0"), + pytest.param(None, 0, 0, None, None, 0, id="test_no_page_size"), + pytest.param("2", 0, 2, {"id": 1}, 1, 1, id="test_page_size_from_string"), + pytest.param("{{ config['value'] }}", 0, 2, {"id": 1}, 1, 1, id="test_page_size_from_config"), ], ) -def test_page_increment_paginator_strategy(page_size, start_from, last_records, expected_next_page_token, expected_offset): +def test_page_increment_paginator_strategy(page_size, start_from, last_page_size, last_record, expected_next_page_token, expected_offset): paginator_strategy = PageIncrement(page_size=page_size, parameters={}, start_from_page=start_from, config={"value": 2}) assert paginator_strategy._page == start_from @@ -32,9 +32,9 @@ def test_page_increment_paginator_strategy(page_size, start_from, last_records, response_body = {"next": "https://airbyte.io/next_url"} response._content = json.dumps(response_body).encode("utf-8") - next_page_token = paginator_strategy.next_page_token(response, last_records) - assert next_page_token == expected_next_page_token - assert paginator_strategy._page == expected_offset + next_page_token = paginator_strategy.next_page_token(response, last_page_size, last_record) + assert expected_next_page_token == next_page_token + assert expected_offset == paginator_strategy._page paginator_strategy.reset() assert start_from == paginator_strategy._page diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py index a16e61a8f0c0b..86c5e65fcda9e 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py @@ -15,7 +15,7 @@ from pytest import fixture ANY_RECORD = Mock() -NO_RECORDS = [] +NO_RECORD = None ANY_RESPONSE = Mock() @@ -45,14 +45,13 @@ def test_given_record_should_not_be_synced_when_is_met_return_true(mocked_cursor def test_given_stop_condition_is_met_when_next_page_token_then_return_none(mocked_pagination_strategy, mocked_stop_condition): - mocked_stop_condition.is_met.side_effect = [False, True] - first_record = Mock(spec=Record) + mocked_stop_condition.is_met.return_value = True last_record = Mock(spec=Record) decorator = StopConditionPaginationStrategyDecorator(mocked_pagination_strategy, mocked_stop_condition) - assert not decorator.next_page_token(ANY_RESPONSE, [first_record, last_record]) - mocked_stop_condition.is_met.assert_has_calls([call(last_record), call(first_record)]) + assert not decorator.next_page_token(ANY_RESPONSE, 2, last_record) + mocked_stop_condition.is_met.assert_has_calls([call(last_record)]) def test_given_last_record_meets_condition_when_next_page_token_then_do_not_check_for_other_records( @@ -62,7 +61,7 @@ def test_given_last_record_meets_condition_when_next_page_token_then_do_not_chec last_record = Mock(spec=Record) StopConditionPaginationStrategyDecorator(mocked_pagination_strategy, mocked_stop_condition).next_page_token( - ANY_RESPONSE, [Mock(spec=Record), last_record] + ANY_RESPONSE, 2, last_record ) mocked_stop_condition.is_met.assert_called_once_with(last_record) @@ -70,24 +69,23 @@ def test_given_last_record_meets_condition_when_next_page_token_then_do_not_chec def test_given_stop_condition_is_not_met_when_next_page_token_then_delegate(mocked_pagination_strategy, mocked_stop_condition): mocked_stop_condition.is_met.return_value = False - first_record = Mock(spec=Record) last_record = Mock(spec=Record) decorator = StopConditionPaginationStrategyDecorator(mocked_pagination_strategy, mocked_stop_condition) - next_page_token = decorator.next_page_token(ANY_RESPONSE, [first_record, last_record]) + next_page_token = decorator.next_page_token(ANY_RESPONSE, 2, last_record) assert next_page_token == mocked_pagination_strategy.next_page_token.return_value - mocked_pagination_strategy.next_page_token.assert_called_once_with(ANY_RESPONSE, [first_record, last_record]) - mocked_stop_condition.is_met.assert_has_calls([call(last_record), call(first_record)]) + mocked_pagination_strategy.next_page_token.assert_called_once_with(ANY_RESPONSE, 2, last_record) + mocked_stop_condition.is_met.assert_has_calls([call(last_record)]) def test_given_no_records_when_next_page_token_then_delegate(mocked_pagination_strategy, mocked_stop_condition): decorator = StopConditionPaginationStrategyDecorator(mocked_pagination_strategy, mocked_stop_condition) - next_page_token = decorator.next_page_token(ANY_RESPONSE, NO_RECORDS) + next_page_token = decorator.next_page_token(ANY_RESPONSE, 0, NO_RECORD) assert next_page_token == mocked_pagination_strategy.next_page_token.return_value - mocked_pagination_strategy.next_page_token.assert_called_once_with(ANY_RESPONSE, NO_RECORDS) + mocked_pagination_strategy.next_page_token.assert_called_once_with(ANY_RESPONSE, 0, NO_RECORD) def test_when_reset_then_delegate(mocked_pagination_strategy, mocked_stop_condition): diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 9b1d8074e28b7..0fed6be46cc8b 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -92,10 +92,10 @@ def test_simple_retriever_full(mock_http_stream): assert retriever.stream_slices() == stream_slices assert retriever._last_response is None - assert retriever._records_from_last_response == [] - assert retriever._parse_response(response, stream_state={}, records_schema={}) == records + assert retriever._last_record is None + assert list(retriever._parse_response(response, stream_state={}, records_schema={})) == records assert retriever._last_response == response - assert retriever._records_from_last_response == records + assert retriever._last_page_size == 2 [r for r in retriever.read_records(SyncMode.full_refresh)] paginator.reset.assert_called() @@ -137,46 +137,6 @@ def test_simple_retriever_with_request_response_logs(mock_http_stream): assert actual_messages[3] == records[1] -@patch.object(SimpleRetriever, "_read_pages", return_value=iter([])) -def test_simple_retriever_with_request_response_log_last_records(mock_http_stream): - requester = MagicMock() - paginator = MagicMock() - record_selector = MagicMock() - record_selector.select_records.return_value = request_response_logs - response = requests.Response() - response.status_code = 200 - stream_slicer = DatetimeBasedCursor( - start_datetime="", - end_datetime="", - step="P1D", - cursor_field="id", - datetime_format="", - cursor_granularity="P1D", - config={}, - parameters={}, - ) - - retriever = SimpleRetriever( - name="stream_name", - primary_key=primary_key, - requester=requester, - paginator=paginator, - record_selector=record_selector, - stream_slicer=stream_slicer, - parameters={}, - config={}, - ) - - assert retriever._last_response is None - assert retriever._records_from_last_response == [] - assert retriever._parse_response(response, stream_state={}, records_schema={}) == request_response_logs - assert retriever._last_response == response - assert retriever._records_from_last_response == request_response_logs - - [r for r in retriever.read_records(SyncMode.full_refresh)] - paginator.reset.assert_called() - - @pytest.mark.parametrize( "test_name, paginator_mapping, stream_slicer_mapping, expected_mapping", [