Skip to content

Commit

Permalink
low-code: Yield records from generators instead of keeping them in in…
Browse files Browse the repository at this point in the history
…-memory lists (#36406)
  • Loading branch information
girarda committed May 15, 2024
1 parent 4b84c63 commit fb11ca2
Show file tree
Hide file tree
Showing 26 changed files with 161 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Union
from typing import Any, Iterable, List, Mapping, Union

import dpath.util
import requests
Expand Down Expand Up @@ -59,23 +59,24 @@ class DpathExtractor(RecordExtractor):
decoder: Decoder = JsonDecoder(parameters={})

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [InterpolatedString.create(path, parameters=parameters) for path in self.field_path]
for path_index in range(len(self.field_path)):
if isinstance(self.field_path[path_index], str):
self.field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters)
self._field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters)

def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
response_body = self.decoder.decode(response)
if len(self.field_path) == 0:
if len(self._field_path) == 0:
extracted = response_body
else:
path = [path.eval(self.config) for path in self.field_path] # type: ignore # field_path elements are always cast to interpolated string
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.util.values(response_body, path)
else:
extracted = dpath.util.get(response_body, path, default=[])
if isinstance(extracted, list):
return extracted
yield from extracted
elif extracted:
return [extracted]
yield extracted
else:
return []
yield from []
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping, Optional
from typing import Any, Iterable, Mapping, Optional

import requests
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState
Expand All @@ -25,7 +25,7 @@ def select_records(
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Record]:
) -> Iterable[Record]:
"""
Selects records from the response
:param response: The response to select the records from
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping
from typing import Any, Iterable, Mapping

import requests

Expand All @@ -19,7 +18,7 @@ class RecordExtractor:
def extract_records(
self,
response: requests.Response,
) -> List[Mapping[str, Any]]:
) -> Iterable[Mapping[str, Any]]:
"""
Selects records from the response
:param response: The response to extract the records from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Optional
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
Expand All @@ -27,10 +27,12 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:

def filter_records(
self,
records: List[Mapping[str, Any]],
records: Iterable[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Mapping[str, Any]]:
) -> Iterable[Mapping[str, Any]]:
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
return [record for record in records if self._filter_interpolator.eval(self.config, record=record, **kwargs)]
for record in records:
if self._filter_interpolator.eval(self.config, record=record, **kwargs):
yield record
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, Optional
from typing import Any, Iterable, List, Mapping, Optional

import requests
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
Expand Down Expand Up @@ -50,7 +50,7 @@ def select_records(
records_schema: Mapping[str, Any],
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Record]:
) -> Iterable[Record]:
"""
Selects records from the response
:param response: The response to select the records from
Expand All @@ -60,38 +60,47 @@ def select_records(
:param next_page_token: The paginator token
:return: List of Records selected from the response
"""
all_data = self.extractor.extract_records(response)
all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response)
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
self._transform(filtered_data, stream_state, stream_slice)
self._normalize_by_schema(filtered_data, schema=records_schema)
return [Record(data, stream_slice) for data in filtered_data]
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
for data in normalized_data:
yield Record(data, stream_slice)

def _normalize_by_schema(self, records: List[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]) -> List[Mapping[str, Any]]:
def _normalize_by_schema(
self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]
) -> Iterable[Mapping[str, Any]]:
if schema:
# record has type Mapping[str, Any], but dict[str, Any] expected
return [self.schema_normalization.transform(record, schema) for record in records] # type: ignore
return records
for record in records:
normalized_record = dict(record)
self.schema_normalization.transform(normalized_record, schema)
yield normalized_record
else:
yield from records

def _filter(
self,
records: List[Mapping[str, Any]],
records: Iterable[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice],
next_page_token: Optional[Mapping[str, Any]],
) -> List[Mapping[str, Any]]:
) -> Iterable[Mapping[str, Any]]:
if self.record_filter:
return self.record_filter.filter_records(
yield from self.record_filter.filter_records(
records, stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)
return records
else:
yield from records

def _transform(
self,
records: List[Mapping[str, Any]],
records: Iterable[Mapping[str, Any]],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
) -> None:
) -> Iterable[Mapping[str, Any]]:
for record in records:
for transformation in self.transformations:
# record has type Mapping[str, Any], but Record expected
transformation.transform(record, config=self.config, stream_state=stream_state, stream_slice=stream_slice) # type: ignore
yield record
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,11 @@ class CursorPagination(BaseModel):
cursor_value: str = Field(
...,
description='Value of the cursor defining the next page to fetch.',
examples=['{{ headers.link.next.cursor }}', "{{ last_record['key'] }}", "{{ response['nextPage'] }}"],
examples=[
'{{ headers.link.next.cursor }}',
"{{ last_record['key'] }}",
"{{ response['nextPage'] }}",
],
title='Cursor Value',
)
page_size: Optional[int] = Field(None, description='The number of records to include in each pages.', examples=[100], title='Page Size')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, MutableMapping, Optional, Union
from typing import Any, Mapping, MutableMapping, Optional, Union

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
Expand Down Expand Up @@ -101,8 +101,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.url_base = InterpolatedString(string=self.url_base, parameters=parameters)
self._token = self.pagination_strategy.initial_token

def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Mapping[str, Any]]:
self._token = self.pagination_strategy.next_page_token(response, last_records)
def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
) -> Optional[Mapping[str, Any]]:
self._token = self.pagination_strategy.next_page_token(response, last_page_size, last_record)
if self._token:
return {"next_page_token": self._token}
else:
Expand Down Expand Up @@ -185,12 +187,14 @@ def __init__(self, decorated: Paginator, maximum_number_of_pages: int = 5) -> No
self._decorated = decorated
self._page_count = self._PAGE_COUNT_BEFORE_FIRST_NEXT_CALL

def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Mapping[str, Any]]:
def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
) -> Optional[Mapping[str, Any]]:
if self._page_count >= self._maximum_number_of_pages:
return None

self._page_count += 1
return self._decorated.next_page_token(response, last_records)
return self._decorated.next_page_token(response, last_page_size, last_record)

def path(self) -> Optional[str]:
return self._decorated.path()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, MutableMapping, Optional, Union
from typing import Any, Mapping, MutableMapping, Optional, Union

import requests
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
Expand Down Expand Up @@ -57,7 +57,7 @@ def get_request_body_json(
) -> Mapping[str, Any]:
return {}

def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Mapping[str, Any]:
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Mapping[str, Any]:
return {}

def reset(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping, Optional
from typing import Any, Mapping, Optional

import requests
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
Expand All @@ -27,12 +27,15 @@ def reset(self) -> None:
"""

@abstractmethod
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Mapping[str, Any]]:
def next_page_token(
self, response: requests.Response, last_page_size: int, last_record: Optional[Record]
) -> Optional[Mapping[str, Any]]:
"""
Returns the next_page_token to use to fetch the next page of records.
:param response: the response to process
:param last_records: the records extracted from the response
:param last_page_size: the number of records read from the response
:param last_record: the last record extracted from the response
:return: A mapping {"next_page_token": <token>} for the next page from the input response object. Returning None means there are no more pages to read in this response.
"""
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, Dict, List, Mapping, Optional, Union
from typing import Any, Dict, Mapping, Optional, Union

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
Expand Down Expand Up @@ -40,42 +40,37 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
else:
self._cursor_value = self.cursor_value
if isinstance(self.stop_condition, str):
self._stop_condition = InterpolatedBoolean(condition=self.stop_condition, parameters=parameters)
self._stop_condition: Optional[InterpolatedBoolean] = InterpolatedBoolean(condition=self.stop_condition, parameters=parameters)
else:
self._stop_condition = self.stop_condition # type: ignore # the type has been checked
self._stop_condition = self.stop_condition

@property
def initial_token(self) -> Optional[Any]:
return None

def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]:
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]:
decoded_response = self.decoder.decode(response)

# The default way that link is presented in requests.Response is a string of various links (last, next, etc). This
# is not indexable or useful for parsing the cursor, so we replace it with the link dictionary from response.links
headers: Dict[str, Any] = dict(response.headers)
headers["link"] = response.links

last_record = last_records[-1] if last_records else None

if self._stop_condition:
should_stop = self._stop_condition.eval(
self.config,
response=decoded_response,
headers=headers,
last_records=last_records,
last_record=last_record,
last_page_size=len(last_records),
last_page_size=last_page_size,
)
if should_stop:
return None
token = self._cursor_value.eval(
config=self.config,
last_records=last_records,
response=decoded_response,
headers=headers,
last_record=last_record,
last_page_size=len(last_records),
last_page_size=last_page_size,
)
return token if token else None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Optional, Union
from typing import Any, Mapping, Optional, Union

import requests
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
Expand Down Expand Up @@ -56,14 +56,14 @@ def initial_token(self) -> Optional[Any]:
return self._offset
return None

def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]:
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]:
decoded_response = self.decoder.decode(response)

# Stop paginating when there are fewer records than the page size or the current page has no records
if (self._page_size and len(last_records) < self._page_size.eval(self.config, response=decoded_response)) or len(last_records) == 0:
if (self._page_size and last_page_size < self._page_size.eval(self.config, response=decoded_response)) or last_page_size == 0:
return None
else:
self._offset += len(last_records)
self._offset += last_page_size
return self._offset

def reset(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Optional, Union
from typing import Any, Mapping, Optional, Union

import requests
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
Expand Down Expand Up @@ -43,9 +43,9 @@ def initial_token(self) -> Optional[Any]:
return self._page
return None

def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]:
def next_page_token(self, response: requests.Response, last_page_size: int, last_record: Optional[Record]) -> Optional[Any]:
# Stop paginating when there are fewer records than the page size or the current page has no records
if (self._page_size and len(last_records) < self._page_size) or len(last_records) == 0:
if (self._page_size and last_page_size < self._page_size) or last_page_size == 0:
return None
else:
self._page += 1
Expand Down

0 comments on commit fb11ca2

Please sign in to comment.