Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alex/test iterable #38110

Draft
wants to merge 33 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3110caa
yield from generators
girarda Mar 22, 2024
10640ab
update interfaces and fix the tests
girarda Mar 22, 2024
148419e
fix some mypy issues
girarda Mar 22, 2024
3a7a0b8
fix the rest of mypy issues
girarda Mar 22, 2024
09e8696
update the schema
girarda Mar 22, 2024
30ba4d4
Add last_page_size and last_record to pagination context
girarda Mar 22, 2024
97449e6
boundaries check
girarda Mar 25, 2024
a7b57dd
add a test
girarda Mar 25, 2024
ea63f36
format
girarda Mar 26, 2024
d89dcbe
Merge branch 'master' into alex/replace_last_records_from_paginators
girarda Mar 26, 2024
2aaa8d4
missing unit test
girarda Mar 26, 2024
4e193d3
Merge branch 'alex/replace_last_records_from_paginators' of github.co…
girarda Mar 26, 2024
7b514e0
missing newline
girarda Mar 26, 2024
855e77f
merge
girarda Mar 26, 2024
834fc96
fix imports
girarda Mar 26, 2024
341d671
update unit tests
girarda Mar 26, 2024
db9721b
merge
girarda May 9, 2024
28583e5
fixes
girarda May 9, 2024
f75fb47
Revert "Airbyte CDK: use pytz.utc instead of datetime.utc (#38026)"
girarda May 9, 2024
6f89fba
bump cdk
girarda May 10, 2024
0aa79ec
dont keep records in memory
girarda May 10, 2024
f1d1ed4
Revert "Revert "Airbyte CDK: use pytz.utc instead of datetime.utc (#3…
girarda May 10, 2024
d213f20
Revert "Revert "Revert "Airbyte CDK: use pytz.utc instead of datetime…
girarda May 10, 2024
c67f331
lower to 30 days
girarda May 10, 2024
8465b76
Merge branch 'master' into alex/test_iterable
girarda May 17, 2024
36e5fa4
bump to cdk 0.90.0
girarda May 17, 2024
c235360
reset
girarda May 17, 2024
c43415d
stream response
girarda May 18, 2024
4a9470e
reset to master
girarda May 18, 2024
10caba8
Revert "reset to master"
girarda May 22, 2024
0dc7226
Merge branch 'master' into alex/test_iterable
girarda May 23, 2024
47e01bf
wip
girarda May 24, 2024
4ee47c1
update
girarda May 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ definitions:
decoder:
title: Decoder
description: Component decoding the response so records can be extracted.
"$ref": "#/definitions/JsonDecoder"
"$ref": "#/definitions/Decoder"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -1247,7 +1247,9 @@ definitions:
decoder:
title: Decoder
description: Component decoding the response so records can be extracted.
"$ref": "#/definitions/JsonDecoder"
anyOf:
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
page_size_option:
"$ref": "#/definitions/RequestOption"
page_token_option:
Expand Down Expand Up @@ -1284,7 +1286,9 @@ definitions:
decoder:
title: Decoder
description: Component decoding the response so records can be extracted.
"$ref": "#/definitions/JsonDecoder"
anyOf:
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -1645,6 +1649,17 @@ definitions:
type:
type: string
enum: [JsonDecoder]
JsonlDecoder:
title: Jsonl Decoder
type: object
required:
- type
properties:
type:
type: string
enum: [JsonlDecoder]
field:
type: string
ListPartitionRouter:
title: List Partition Router
description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests.
Expand Down Expand Up @@ -2257,6 +2272,12 @@ definitions:
anyOf:
- "$ref": "#/definitions/DefaultPaginator"
- "$ref": "#/definitions/NoPagination"
decoder:
title: Decoder
description: Component decoding the response so records can be extracted.
anyOf:
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
ignore_stream_slicer_parameters_on_paginated_requests:
description: If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.
type: boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
#

from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder

__all__ = ["Decoder", "JsonDecoder"]
__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder"]
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Union

import json
import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder

Expand All @@ -22,3 +23,20 @@ def decode(self, response: requests.Response) -> Union[Mapping[str, Any], List[A
return response.json()
except requests.exceptions.JSONDecodeError:
return {}


@dataclass
class JsonlDecoder(Decoder):
"""
"""

parameters: InitVar[Mapping[str, Any]]

def decode(self, response: requests.Response) -> Union[Mapping[str, Any], List[Any], Any]:
print(f"jsonldecoder")
# j = response.json()
# print(f"j: {j}")
for record in response.iter_lines():
print(f"record: {record}")
#yield record
yield from [{}]
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path[path_index] = InterpolatedString.create(self.field_path[path_index], parameters=parameters)

def extract_records(self, response: requests.Response) -> Iterable[Mapping[str, Any]]:
response_body = self.decoder.decode(response)
if len(self._field_path) == 0:
extracted = response_body
else:
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.util.values(response_body, path)
for body in self.decoder.decode(response):
if len(self._field_path) == 0:
extracted = body
else:
extracted = dpath.util.get(response_body, path, default=[])
if isinstance(extracted, list):
yield from extracted
elif extracted:
yield extracted
else:
yield from []
path = [path.eval(self.config) for path in self._field_path]
if "*" in path:
extracted = dpath.util.values(body, path)
else:
extracted = dpath.util.get(body, path, default=[])
if isinstance(extracted, list):
yield from extracted
elif extracted:
yield extracted
else:
yield from []
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ def select_records(
:param next_page_token: The paginator token
:return: List of Records selected from the response
"""
print(f"RecordSelector: select_records: {self.extractor.decoder}")
print(f"recordextractor: {self.extractor.__class__}")
all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response)
filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
transformed_data = self._transform(filtered_data, stream_state, stream_slice)
normalized_data = self._normalize_by_schema(transformed_data, schema=records_schema)
print("normalized")
for data in normalized_data:
yield Record(data, stream_slice)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,11 @@ class HttpResponseFilter(BaseModel):
title='Error Message Substring',
)
http_codes: Optional[List[int]] = Field(
None, description='Match the response if its HTTP code is included in this list.', examples=[[420, 429], [500]], title='HTTP Codes'
None,
description='Match the response if its HTTP code is included in this list.',
examples=[[420, 429], [500]],
title='HTTP Codes',
unique_items=True,
)
predicate: Optional[str] = Field(
None,
Expand Down Expand Up @@ -536,6 +540,11 @@ class JsonDecoder(BaseModel):
type: Literal['JsonDecoder']


class JsonlDecoder(BaseModel):
type: Literal['JsonlDecoder']
field: Optional[str] = None


class MinMaxDatetime(BaseModel):
type: Literal['MinMaxDatetime']
datetime: str = Field(
Expand Down Expand Up @@ -801,6 +810,10 @@ class WaitUntilTimeFromHeader(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class Decoder(BaseModel):
__root__: Any


class AddedFieldDefinition(BaseModel):
type: Literal['AddedFieldDefinition']
path: List[str] = Field(
Expand Down Expand Up @@ -874,11 +887,7 @@ class CursorPagination(BaseModel):
cursor_value: str = Field(
...,
description='Value of the cursor defining the next page to fetch.',
examples=[
'{{ headers.link.next.cursor }}',
"{{ last_record['key'] }}",
"{{ response['nextPage'] }}",
],
examples=['{{ headers.link.next.cursor }}', "{{ last_record['key'] }}", "{{ response['nextPage'] }}"],
title='Cursor Value',
)
page_size: Optional[int] = Field(None, description='The number of records to include in each pages.', examples=[100], title='Page Size')
Expand All @@ -888,9 +897,7 @@ class CursorPagination(BaseModel):
examples=['{{ response.data.has_more is false }}', "{{ 'next' not in headers['link'] }}"],
title='Stop Condition',
)
decoder: Optional[JsonDecoder] = Field(
None, description='Component decoding the response so records can be extracted.', title='Decoder'
)
decoder: Optional[Decoder] = Field(None, description='Component decoding the response so records can be extracted.', title='Decoder')
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


Expand Down Expand Up @@ -995,7 +1002,7 @@ class DefaultPaginator(BaseModel):
pagination_strategy: Union[CursorPagination, CustomPaginationStrategy, OffsetIncrement, PageIncrement] = Field(
..., description='Strategy defining how records are paginated.', title='Pagination Strategy'
)
decoder: Optional[JsonDecoder] = Field(
decoder: Optional[Union[JsonDecoder, JsonlDecoder]] = Field(
None, description='Component decoding the response so records can be extracted.', title='Decoder'
)
page_size_option: Optional[RequestOption] = None
Expand All @@ -1011,7 +1018,7 @@ class DpathExtractor(BaseModel):
examples=[['data'], ['data', 'records'], ['data', '{{ parameters.name }}'], ['data', '*', 'record']],
title='Field Path',
)
decoder: Optional[JsonDecoder] = Field(
decoder: Optional[Union[JsonDecoder, JsonlDecoder]] = Field(
None, description='Component decoding the response so records can be extracted.', title='Decoder'
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')
Expand Down Expand Up @@ -1317,6 +1324,9 @@ class SimpleRetriever(BaseModel):
paginator: Optional[Union[DefaultPaginator, NoPagination]] = Field(
None, description="Paginator component that describes how to navigate through the API's pages."
)
decoder: Optional[Union[JsonDecoder, JsonlDecoder]] = Field(
None, description='Component decoding the response so records can be extracted.', title='Decoder'
)
ignore_stream_slicer_parameters_on_paginated_requests: Optional[bool] = Field(
False,
description='If true, the partition router and incremental request options will be ignored when paginating requests. Request options set directly on the requester will not be ignored.',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders import JsonDecoder
from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder, JsonlDecoder
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector
from airbyte_cdk.sources.declarative.extractors.record_selector import SCHEMA_TRANSFORMER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.incremental import (
Expand Down Expand Up @@ -72,6 +72,7 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import HttpResponseFilter as HttpResponseFilterModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import InlineSchemaLoader as InlineSchemaLoaderModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import JsonDecoder as JsonDecoderModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import JsonlDecoder as JsonlDecoderModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import JsonFileSchemaLoader as JsonFileSchemaLoaderModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import JwtAuthenticator as JwtAuthenticatorModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import JwtHeaders as JwtHeadersModel
Expand Down Expand Up @@ -197,6 +198,7 @@ def _init_mappings(self) -> None:
HttpResponseFilterModel: self.create_http_response_filter,
InlineSchemaLoaderModel: self.create_inline_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
JwtAuthenticatorModel: self.create_jwt_authenticator,
LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration,
Expand Down Expand Up @@ -239,7 +241,6 @@ def create_component(
:return: The declarative component to be used at runtime
"""

component_type = component_definition.get("type")
if component_definition.get("type") != model_type.__name__:
raise ValueError(f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead")

Expand Down Expand Up @@ -722,6 +723,7 @@ def create_default_paginator(
url_base: str,
cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None,
) -> Union[DefaultPaginator, PaginatorTestReadDecorator]:
# Fixme need to get the parent decoder
decoder = self._create_component_from_model(model=model.decoder, config=config) if model.decoder else JsonDecoder(parameters={})
page_size_option = (
self._create_component_from_model(model=model.page_size_option, config=config) if model.page_size_option else None
Expand All @@ -748,16 +750,21 @@ def create_default_paginator(
return PaginatorTestReadDecorator(paginator, self._limit_pages_fetched_per_slice)
return paginator

def create_dpath_extractor(self, model: DpathExtractorModel, config: Config, **kwargs: Any) -> DpathExtractor:
decoder = self._create_component_from_model(model.decoder, config=config) if model.decoder else JsonDecoder(parameters={})
def create_dpath_extractor(self, model: DpathExtractorModel, decoder: Optional[Decoder], config: Config, **kwargs: Any) -> DpathExtractor:
if model.decoder:
decoder_to_use = self._create_component_from_model(model=model.decoder, config=config)
elif decoder:
decoder_to_use = decoder
else:
decoder_to_use = JsonDecoder(parameters={})
model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path]
return DpathExtractor(decoder=decoder, field_path=model_field_path, config=config, parameters=model.parameters or {})
return DpathExtractor(decoder=decoder_to_use, field_path=model_field_path, config=config, parameters=model.parameters or {})

@staticmethod
def create_exponential_backoff_strategy(model: ExponentialBackoffStrategyModel, config: Config) -> ExponentialBackoffStrategy:
return ExponentialBackoffStrategy(factor=model.factor or 5, parameters=model.parameters or {}, config=config)

def create_http_requester(self, model: HttpRequesterModel, config: Config, *, name: str) -> HttpRequester:
def create_http_requester(self, model: HttpRequesterModel, decoder: Optional[Decoder], config: Config, *, name: str) -> HttpRequester:
authenticator = (
self._create_component_from_model(model=model.authenticator, config=config, url_base=model.url_base, name=name)
if model.authenticator
Expand All @@ -783,6 +790,11 @@ def create_http_requester(self, model: HttpRequesterModel, config: Config, *, na

assert model.use_cache is not None # for mypy

if isinstance(decoder, JsonlDecoder):
stream_response_line_by_line = True
else:
stream_response_line_by_line = False

return HttpRequester(
name=name,
url_base=model.url_base,
Expand All @@ -796,6 +808,7 @@ def create_http_requester(self, model: HttpRequesterModel, config: Config, *, na
parameters=model.parameters or {},
message_repository=self._message_repository,
use_cache=model.use_cache,
stream_response=stream_response_line_by_line,
)

@staticmethod
Expand Down Expand Up @@ -823,6 +836,10 @@ def create_inline_schema_loader(model: InlineSchemaLoaderModel, config: Config,
def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> JsonDecoder:
return JsonDecoder(parameters={})

@staticmethod
def create_jsonl_decoder(model: JsonlDecoderModel, config: Config, **kwargs: Any) -> JsonlDecoder:
return JsonlDecoder(parameters={})

@staticmethod
def create_json_file_schema_loader(model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any) -> JsonFileSchemaLoader:
return JsonFileSchemaLoader(file_path=model.file_path or "", config=config, parameters=model.parameters or {})
Expand Down Expand Up @@ -980,12 +997,13 @@ def create_record_selector(
self,
model: RecordSelectorModel,
config: Config,
decoder: Optional[Decoder],
*,
transformations: List[RecordTransformation],
**kwargs: Any,
) -> RecordSelector:
assert model.schema_normalization is not None # for mypy
extractor = self._create_component_from_model(model=model.extractor, config=config)
extractor = self._create_component_from_model(model=model.extractor, decoder=decoder, config=config)
record_filter = self._create_component_from_model(model.record_filter, config=config) if model.record_filter else None
schema_normalization = TypeTransformer(SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization])

Expand Down Expand Up @@ -1040,8 +1058,9 @@ def create_simple_retriever(
stop_condition_on_cursor: bool = False,
transformations: List[RecordTransformation],
) -> SimpleRetriever:
requester = self._create_component_from_model(model=model.requester, config=config, name=name)
record_selector = self._create_component_from_model(model=model.record_selector, config=config, transformations=transformations)
decoder = self._create_component_from_model(model=model.decoder, config=config) if model.decoder else None
requester = self._create_component_from_model(model=model.requester, decoder=decoder, config=config, name=name)
record_selector = self._create_component_from_model(model=model.record_selector, config=config, decoder=decoder, transformations=transformations)
url_base = model.requester.url_base if hasattr(model.requester, "url_base") else requester.get_url_base()
stream_slicer = stream_slicer or SinglePartitionRouter(parameters={})
cursor = stream_slicer if isinstance(stream_slicer, DeclarativeCursor) else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if isinstance(self.action, str):
self.action = ResponseAction[self.action]
self.http_codes = self.http_codes or set()
if isinstance(self.predicate, str):
if self.predicate and isinstance(self.predicate, str):
self.predicate = InterpolatedBoolean(condition=self.predicate, parameters=parameters)
else:
self.predicate = None
self.error_message = InterpolatedString.create(string_or_interpolated=self.error_message, parameters=parameters)

def matches(self, response: requests.Response, backoff_time: Optional[float] = None) -> Optional[ResponseStatus]:
Expand Down Expand Up @@ -91,6 +93,8 @@ def _create_error_message(self, response: requests.Response) -> str:
return self.error_message.eval(self.config, response=self._safe_response_json(response), headers=response.headers) # type: ignore # error_message is always cast to an interpolated string

def _response_matches_predicate(self, response: requests.Response) -> bool:
if not self.predicate:
return False
return bool(self.predicate and self.predicate.eval(None, response=self._safe_response_json(response), headers=response.headers)) # type: ignore # predicate is always cast to an interpolated string

def _response_contains_error_message(self, response: requests.Response) -> bool:
Expand Down