From 44ec661b5adae1dac1aa1f86e7a730b3a52b5727 Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Wed, 27 Jul 2022 15:30:49 -0700 Subject: [PATCH] [low-code connectors] Add request options and state to stream slicers (#14552) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * comment * comment * comments * fix * test for instantiating chain retrier * fix parsing * cleanup * fix * reset * never raise on http error * remove print * comment * comment * comment * comment * remove prints * add declarative stream to registry * start working on limit paginator * support for offset pagination * tests * move limit value * extract request option * boilerplate * page increment * delete offset paginator * update conditional paginator * refactor and fix test * fix test * small fix * Delete dead code * Add docstrings * quick fix * exponential backoff * fix test * fix * delete unused properties * fix * missing unit tests * uppercase * docstrings * rename to success * compare full request instead of just url * renmae module * rename test file * rename interface * rename default retrier * rename to compositeerrorhandler * fix missing renames * move action to filter * str -> minmaxdatetime * small fixes * plural * add example * handle header variations * also fix wait time from * allow using a regex to extract the value * group() * docstring * add docs * update comment * docstrings * fix tests * rename param * cleanup stop_condition * cleanup * Add examples * interpolated pagination strategy * dont need duplicate class * docstrings * more docstrings * docstrings * fix tests * first pass at substream * seems to work for a single stream * can also be defined in requester with stream_state * tmp update * update comment * Update airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py Co-authored-by: Sherif A. Nada * version: Update Parquet library to latest release (#14502) The upstream Parquet library that is currently pinned for use in the S3 destination plugin is over a year old. The current version is generating invalid schemas for date-time with time-zone fields which appears to be addressed in the `1.12.3` release of the library in commit https://github.com/apache/parquet-mr/commit/c72862b61399ff516e968fbd02885e573d4be81c * merge * 🎉 Source Github: improve schema for stream `pull_request_commits` added "null" (#14613) Signed-off-by: Sergey Chvalyuk * Docs: Fixed broken links (#14622) * fixing broken links * more broken links * source-hubspot: change mentioning of Mailchimp into HubSpot doc (#14620) * Helm Chart: Add external temporal option (#14597) * conflict env configmap and chart lock * reverting lock * add eof lines and documentation on values yaml * conflict json file * rollback json * solve conflict * correct minio with new version Co-authored-by: Guy Feldman * 🎉 Add YAML format to source-file reader (#14588) * Add yaml reader * Update docs * Bumpversion of connector * bump docs * Update pyarrow dependency * Upgrade pandas dependency * auto-bump connector version Co-authored-by: Octavia Squidington III * :tada: Source Okta: add GroupMembers stream (#14380) * add Group_Members stream to okta source - Group_Members return a list of users, the same schema of Users stream. - Create a shared schema users, and both group_members and users sechema use it as a reference. - Add Group_Members stream to source connector * add tests and fix logs schema - fix the test error: None is not one of enums though the enum type includes both string and null, it comes from json schema validator https://github.com/python-jsonschema/jsonschema/blob/ddb87afad8f5d5c40600b5ede0ab96e4d4bdf7d3/jsonschema/_validators.py#L279-L285 - change grouop_members to use id as the cursor field since `filter` is not supported in the query string - fix the abnormal state test on logs stream, when since is abnormally large, until has to defined, an equal or a larger value - remove logs stream from full sync test, because 2 full sync always has a gap -- at least a new log about users or groups api. * last polish before submit the PR - bump docker version - update changelog - add the right abnormal value for logs stream - correct the sample catalog * address comments:: - improve comments for until parameter under the logs stream - add use_cache on groupMembers * add use_cache to Group_Members * change configured_catalog to test * auto-bump connector version Co-authored-by: marcosmarxm Co-authored-by: Octavia Squidington III * split test files * renames * missing unit test * add missing unit tests * rename * assert isinstance * start extracting to their own files * use final instead of classmethod * assert we retry 429 errors * Add log * replace asserts with valueexceptions * delete superfluous print statement * only accept minmaxdatetime * fix factory so we don't need to union everything with strings * get class_name from type * remove from class types registry * process error handlers one at a time * sort * delete print statement * comment * comment * format * delete unused file * comment * interpolatedboolean * comment * not optional * not optional * unit tests * fix request body data * add test * move file to right module * update * reset to master * format * rename to pass_by * rename to page size * fix * fix some tests * reset * fix * fix some of the tests * fix test * fix more tests * all tests pass * path is not optional * reset * reset * reset * delete print * remove prints * delete duplicate method * add test * fix body data * delete extra newlines * move to subpackage * fix imports * handle str body data * simplify * Update tests * filter dates before stream state * Revert "Update tests" This reverts commit c0808c8009c850848e18c4f0190ae5f26b9c086a. * update * fix test * state management * add test * delete dead code * update cursor * update cursor cartesian * delete unused state class * fix * missing test * update cursor substreams * missing test * fix typing * fix typing * delete unused field * delete unused method * update datetime stream slice * cleanup * assert * request options * request option cartesian * assert when passing by path * request options for substreams * always return a map * pass stream_state * refactor and almost done fixing tests * fix tests * rename to inject_into * only accept enum * delete conditional paginator * only return body data * missing test * update docstrings * update docstrings * update comment * rename * tests * class_name -> type * improve interface * fix some of the tests * fix more of the tests * fix tests * reset * reset * Revert "reset" This reverts commit eb9a918a095a22c6849d50f8881589a1b58a9309. * remove extra argument * docstring * update * delete unused file * reset * reset * rename * fix timewindow * create InterpolatedString * helper method * assert on request option * better asserts * format * docstrings * docstrings * remove optional from type hint * Update airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py Co-authored-by: Sherif A. Nada * inherit from request options provider * inherit from request options provider * remove optional from type hint * remove extra parameter * none check Co-authored-by: Sherif A. Nada Co-authored-by: Tobias Macey Co-authored-by: Serhii Chvaliuk Co-authored-by: Amruta Ranade <11484018+Amruta-Ranade@users.noreply.github.com> Co-authored-by: Bas Beelen Co-authored-by: Marcos Marx Co-authored-by: Guy Feldman Co-authored-by: Christophe Duong Co-authored-by: Octavia Squidington III Co-authored-by: Yiyang Li Co-authored-by: marcosmarxm --- .../parsers/class_types_registry.py | 2 + .../default_implementation_registry.py | 5 +- .../requesters/paginators/limit_paginator.py | 4 + .../requesters/paginators/no_pagination.py | 4 + .../requesters/paginators/paginator.py | 5 +- .../request_options_provider.py | 5 +- .../retrievers/simple_retriever.py | 20 +- .../sources/declarative/states/__init__.py | 3 - .../sources/declarative/states/dict_state.py | 67 --- .../sources/declarative/states/state.py | 19 - .../cartesian_product_stream_slicer.py | 28 +- .../stream_slicers/datetime_stream_slicer.py | 161 +++++-- .../stream_slicers/list_stream_slicer.py | 63 ++- .../stream_slicers/single_slice.py | 28 +- .../stream_slicers/stream_slicer.py | 40 +- .../stream_slicers/substream_slicer.py | 108 +++-- .../declarative/requesters/test_dict_state.py | 85 ---- .../retrievers/test_simple_retriever.py | 10 +- .../test_cartesian_product_stream_slicer.py | 103 ++++- .../test_datetime_stream_slicer.py | 399 +++++++++++++----- .../stream_slicers/test_list_slicer.py | 88 +++- .../stream_slicers/test_substream_slicer.py | 155 ++++++- .../sources/declarative/test_factory.py | 112 ++++- 23 files changed, 1110 insertions(+), 404 deletions(-) delete mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/states/__init__.py delete mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/states/dict_state.py delete mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/states/state.py delete mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_dict_state.py diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py index 4f58d4e64487b2..ad0c268e1ac15a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py @@ -27,6 +27,7 @@ from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer +from airbyte_cdk.sources.declarative.stream_slicers.substream_slicer import SubstreamSlicer from airbyte_cdk.sources.declarative.transformations import RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddFields @@ -59,4 +60,5 @@ "RecordSelector": RecordSelector, "RemoveFields": RemoveFields, "SimpleRetriever": SimpleRetriever, + "SubstreamSlicer": SubstreamSlicer, } diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py index b50b7ecd16b75c..f09c00d954e855 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/default_implementation_registry.py @@ -30,10 +30,9 @@ from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader -from airbyte_cdk.sources.declarative.states.dict_state import DictState -from airbyte_cdk.sources.declarative.states.state import State from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer +from airbyte_cdk.sources.declarative.stream_slicers.substream_slicer import ParentStreamConfig from airbyte_cdk.sources.streams.core import Stream """ @@ -55,8 +54,8 @@ RequestOptionsProvider: InterpolatedRequestOptionsProvider, Requester: HttpRequester, Retriever: SimpleRetriever, + ParentStreamConfig: ParentStreamConfig, SchemaLoader: JsonSchema, - State: DictState, Stream: DeclarativeStream, StreamSlicer: SingleSlice, } diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py index 8764ca85d8f1ee..6a41b4cf1b8645 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/limit_paginator.py @@ -127,6 +127,10 @@ def request_body_data(self) -> Mapping[str, Any]: def request_body_json(self) -> Mapping[str, Any]: return self._get_request_options(RequestOptionType.body_json) + def request_kwargs(self) -> Mapping[str, Any]: + # Never update kwargs + return {} + def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]: options = {} if self._page_token_option.inject_into == option_type: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py index d5c18d4dc8b609..e1592e14ef317c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/no_pagination.py @@ -28,5 +28,9 @@ def request_body_data(self) -> Union[Mapping[str, Any], str]: def request_body_json(self) -> Mapping[str, Any]: return {} + def request_kwargs(self) -> Mapping[str, Any]: + # Never update kwargs + return {} + def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Mapping[str, Any]: return {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py index 8660da3be09bcb..d26581ff985a0d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/paginators/paginator.py @@ -2,13 +2,14 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from abc import ABC, abstractmethod +from abc import abstractmethod from typing import Any, List, Mapping, Optional import requests +from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider -class Paginator(ABC): +class Paginator(RequestOptionsProvider): """ Defines the token to use to fetch the next page of records from the API. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py index d40040deb4b6ce..b9936c1045b816 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/request_options/request_options_provider.py @@ -20,14 +20,13 @@ class RequestOptionsProvider(ABC): """ @abstractmethod - def request_params( - self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None - ) -> MutableMapping[str, Any]: + def request_params(self, **kwargs) -> MutableMapping[str, Any]: """ Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. E.g: you might want to define query parameters for paging if next_page_token is not None. """ + pass @abstractmethod def request_headers( diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 367f12c1e05689..16b742a96e98fa 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -13,8 +13,6 @@ from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator from airbyte_cdk.sources.declarative.requesters.requester import Requester from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever -from airbyte_cdk.sources.declarative.states.dict_state import DictState -from airbyte_cdk.sources.declarative.states.state import State from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState @@ -42,7 +40,6 @@ def __init__( record_selector: HttpSelector, paginator: Optional[Paginator] = None, stream_slicer: Optional[StreamSlicer] = SingleSlice(), - state: Optional[State] = None, ): """ :param name: The stream's name @@ -59,8 +56,7 @@ def __init__( self._requester = requester self._record_selector = record_selector super().__init__(self._requester.get_authenticator()) - self._iterator = stream_slicer - self._state: State = (state or DictState()).deep_copy() + self._stream_slicer = stream_slicer self._last_response = None self._last_records = None @@ -300,12 +296,14 @@ def read_records( stream_state: Optional[StreamState] = None, ) -> Iterable[Mapping[str, Any]]: # Warning: use self.state instead of the stream_state passed as argument! + stream_slice = stream_slice or {} # None-check records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state) for r in records_generator: - self._state.update_state(stream_slice=stream_slice, stream_state=self.state, last_response=self._last_response, last_record=r) + self._stream_slicer.update_cursor(stream_slice, last_record=r) yield r else: - self._state.update_state(stream_slice=stream_slice, stream_state=self.state, last_reponse=self._last_response) + last_record = self._last_records[-1] if self._last_records else None + self._stream_slicer.update_cursor(stream_slice, last_record=last_record) yield from [] def stream_slices( @@ -320,13 +318,13 @@ def stream_slices( :return: """ # Warning: use self.state instead of the stream_state passed as argument! - return self._iterator.stream_slices(sync_mode, self.state) + return self._stream_slicer.stream_slices(sync_mode, self.state) @property - def state(self) -> StreamState: - return self._state.get_stream_state() + def state(self) -> MutableMapping[str, Any]: + return self._stream_slicer.get_stream_state() @state.setter def state(self, value: StreamState): """State setter, accept state serialized by state getter.""" - self._state.set_state(value) + self._stream_slicer.update_cursor(value) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/__init__.py deleted file mode 100644 index 46b7376756ec6e..00000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/dict_state.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/dict_state.py deleted file mode 100644 index fc7ae0cfce922e..00000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/dict_state.py +++ /dev/null @@ -1,67 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from enum import Enum -from typing import Mapping - -from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation -from airbyte_cdk.sources.declarative.states.state import State - - -def _get_max(*, name, val, other_state): - other_val = other_state.get(name) - if other_val: - return max(val, other_val) - else: - return val - - -class StateType(Enum): - STR = str - INT = int - - -class DictState(State): - stream_state_field = "stream_state" - - def __init__(self, initial_mapping: Mapping[str, str] = None, config=None): - if initial_mapping is None: - initial_mapping = dict() - if config is None: - config = dict() - self._templates_to_evaluate = initial_mapping - self._interpolator = JinjaInterpolation() - self._context = dict() - self._config = config - - def set_state(self, state): - self._context[self.stream_state_field] = state - - def update_state(self, **kwargs): - stream_state = kwargs.get(self.stream_state_field) - prev_stream_state = self.get_stream_state() or stream_state - self._context.update(**kwargs) - - self._context[self.stream_state_field] = self._compute_state(prev_stream_state) - - def get_state(self, state_field): - return self._context.get(state_field, {}) - - def get_stream_state(self): - return self.get_state(self.stream_state_field) - - def _compute_state(self, prev_state): - updated_state = { - self._interpolator.eval(name, self._config): self._interpolator.eval(value, self._config, **self._context) - for name, value in self._templates_to_evaluate.items() - } - updated_state = {name: value for name, value in updated_state.items() if value} - - if prev_state: - next_state = {name: _get_max(name=name, val=value, other_state=prev_state) for name, value in updated_state.items()} - else: - next_state = updated_state - - self._context[self.stream_state_field] = next_state - return next_state diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/state.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/state.py deleted file mode 100644 index 8a7528c88b346a..00000000000000 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/states/state.py +++ /dev/null @@ -1,19 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import copy -from abc import ABC, abstractmethod - - -class State(ABC): - @abstractmethod - def update_state(self, **kwargs): - pass - - @abstractmethod - def get_stream_state(self): - pass - - def deep_copy(self): - return copy.deepcopy(self) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py index a627dd353df55a..dc0bc1b6b5111e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py @@ -4,7 +4,7 @@ import itertools from collections import ChainMap -from typing import Any, Iterable, List, Mapping +from typing import Any, Iterable, List, Mapping, Optional from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer @@ -28,8 +28,34 @@ class CartesianProductStreamSlicer(StreamSlicer): """ def __init__(self, stream_slicers: List[StreamSlicer]): + """ + :param stream_slicers: Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer. + """ self._stream_slicers = stream_slicers + def update_cursor(self, stream_slice: Mapping[str, Any], last_record: Optional[Mapping[str, Any]] = None): + for slicer in self._stream_slicers: + slicer.update_cursor(stream_slice, last_record) + + def request_params(self) -> Mapping[str, Any]: + return dict(ChainMap(*[s.request_params() for s in self._stream_slicers])) + + def request_headers(self) -> Mapping[str, Any]: + return dict(ChainMap(*[s.request_headers() for s in self._stream_slicers])) + + def request_body_data(self) -> Mapping[str, Any]: + return dict(ChainMap(*[s.request_body_data() for s in self._stream_slicers])) + + def request_body_json(self) -> Optional[Mapping]: + return dict(ChainMap(*[s.request_body_json() for s in self._stream_slicers])) + + def request_kwargs(self) -> Mapping[str, Any]: + # Never update kwargs + return {} + + def get_stream_state(self) -> Mapping[str, Any]: + return dict(ChainMap(*[slicer.get_stream_state() for slicer in self._stream_slicers])) + def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: sub_slices = (s.stream_slices(sync_mode, stream_state) for s in self._stream_slicers) return (ChainMap(*a) for a in itertools.product(*sub_slices)) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py index 876b9293f5c183..c7b2273017880c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py @@ -6,45 +6,77 @@ import re from typing import Any, Iterable, Mapping, Optional +import dateutil from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation +from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer -from airbyte_cdk.sources.declarative.types import Config +from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState class DatetimeStreamSlicer(StreamSlicer): - timedelta_regex = re.compile( - r"((?P[\.\d]+?)w)?" - r"((?P[\.\d]+?)d)?" - r"((?P[\.\d]+?)h)?" - r"((?P[\.\d]+?)m)?" - r"((?P[\.\d]+?)s)?" - r"((?P[\.\d]+?)ms)?" - r"((?P[\.\d]+?)us)?$" - ) - - # FIXME: start_time, end_time, and step should be datetime and timedelta? - # FIXME: timezone should be declarative? + """ + Slices the stream over a datetime range. + + Given a start time, end time, a step function, and an optional lookback window, + the stream slicer will partition the date range from start time - lookback window to end time. + + The step function is defined as a string of the form: + `""` + + where unit can be one of + - weeks, w + - days, d + + For example, "1d" will produce windows of 1 day, and 2weeks windows of 2 weeks. + """ + + timedelta_regex = re.compile(r"((?P[\.\d]+?)w)?" r"((?P[\.\d]+?)d)?$") + def __init__( self, start_datetime: MinMaxDatetime, end_datetime: MinMaxDatetime, step: str, - cursor_value: InterpolatedString, + cursor_field: InterpolatedString, datetime_format: str, config: Config, + start_time_option: Optional[RequestOption] = None, + end_time_option: Optional[RequestOption] = None, + stream_state_field_start: Optional[str] = None, + stream_state_field_end: Optional[str] = None, lookback_window: Optional[InterpolatedString] = None, ): + """ + :param start_datetime: + :param end_datetime: + :param step: size of the timewindow + :param cursor_field: record's cursor field + :param datetime_format: format of the datetime + :param config: connection config + :param start_time_option: request option for start time + :param end_time_option: request option for end time + :param stream_state_field_start: stream slice start time field + :param stream_state_field_end: stream slice end time field + :param lookback_window: how many days before start_datetime to read data for + """ self._timezone = datetime.timezone.utc self._interpolation = JinjaInterpolation() + self._datetime_format = datetime_format self._start_datetime = start_datetime self._end_datetime = end_datetime self._step = self._parse_timedelta(step) self._config = config - self._cursor_value = cursor_value + self._cursor_field = InterpolatedString.create(cursor_field) + self._start_time_option = start_time_option + self._end_time_option = end_time_option + self._stream_slice_field_start = InterpolatedString.create(stream_state_field_start or "start_date") + self._stream_slice_field_end = InterpolatedString.create(stream_state_field_end or "end_date") + self._cursor = None # tracks current datetime + self._cursor_end = None # tracks end of current stream slice self._lookback_window = lookback_window # If datetime format is not specified then start/end datetime should inherit it from the stream slicer @@ -53,29 +85,85 @@ def __init__( if not self._end_datetime.datetime_format: self._end_datetime.datetime_format = self._datetime_format + if self._start_time_option and self._start_time_option.inject_into == RequestOptionType.path: + raise ValueError("Start time cannot be passed by path") + if self._end_time_option and self._end_time_option.inject_into == RequestOptionType.path: + raise ValueError("End time cannot be passed by path") + + def get_stream_state(self) -> StreamState: + return {self._cursor_field.eval(self._config): self._cursor} if self._cursor else {} + + def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None): + """ + Update the cursor value to the max datetime between the last record, the start of the stream_slice, and the current cursor value. + Update the cursor_end value with the stream_slice's end time. + + :param stream_slice: current stream slice + :param last_record: last record read + :return: None + """ + stream_slice_value = stream_slice.get(self._cursor_field.eval(self._config)) + stream_slice_value_end = stream_slice.get(self._stream_slice_field_end.eval(self._config)) + last_record_value = last_record.get(self._cursor_field.eval(self._config)) if last_record else None + cursor = None + if stream_slice_value and last_record_value: + cursor = max(stream_slice_value, last_record_value) + elif stream_slice_value: + cursor = stream_slice_value + else: + cursor = last_record_value + if self._cursor and cursor: + self._cursor = max(cursor, self._cursor) + elif cursor: + self._cursor = cursor + if self._stream_slice_field_end: + self._cursor_end = stream_slice_value_end + def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: - # Evaluate and compare start_date, end_date, and cursor_value based on configs and runtime state + """ + Partition the daterange into slices of size = step. + + The start of the window is the minimum datetime between start_datetime - looback_window and the stream_state's datetime + The end of the window is the minimum datetime between the start of the window and end_datetime. + + :param sync_mode: + :param stream_state: current stream state. If set, the start_date will be the day following the stream_state. + :return: + """ stream_state = stream_state or {} kwargs = {"stream_state": stream_state} end_datetime = min(self._end_datetime.get_datetime(self._config, **kwargs), datetime.datetime.now(tz=datetime.timezone.utc)) lookback_delta = self._parse_timedelta(self._lookback_window.eval(self._config, **kwargs) if self._lookback_window else "0d") start_datetime = self._start_datetime.get_datetime(self._config, **kwargs) - lookback_delta start_datetime = min(start_datetime, end_datetime) - if self._cursor_value and self._cursor_value.eval(self._config, **kwargs): - cursor_datetime = self.parse_date(self._cursor_value.eval(self._config, **kwargs)) + if self._cursor_field.eval(self._config, stream_state=stream_state) in stream_state: + cursor_datetime = self.parse_date(stream_state[self._cursor_field.eval(self._config)]) else: cursor_datetime = start_datetime + start_datetime = max(cursor_datetime, start_datetime) - if not self._is_start_date_valid(start_datetime, end_datetime): - end_datetime = start_datetime - return self._partition_daterange(start_datetime, end_datetime, self._step) + state_date = self.parse_date(stream_state.get(self._cursor_field.eval(self._config, stream_state=stream_state))) + if state_date: + # If the input_state's date is greater than start_datetime, the start of the time window is the state's next day + next_date = state_date + datetime.timedelta(days=1) + start_datetime = max(start_datetime, next_date) + dates = self._partition_daterange(start_datetime, end_datetime, self._step) + return dates + + def _format_datetime(self, dt: datetime.datetime): + if self._datetime_format == "timestamp": + return dt.timestamp() + else: + return dt.strftime(self._datetime_format) def _partition_daterange(self, start, end, step: datetime.timedelta): + start_field = self._stream_slice_field_start.eval(self._config) + end_field = self._stream_slice_field_end.eval(self._config) dates = [] while start <= end: end_date = self._get_date(start + step - datetime.timedelta(days=1), end, min) - dates.append({"start_date": start.strftime(self._datetime_format), "end_date": end_date.strftime(self._datetime_format)}) + dates.append({start_field: self._format_datetime(start), end_field: self._format_datetime(end_date)}) start += step return dates @@ -88,7 +176,7 @@ def parse_date(self, date: Any) -> datetime: if self.is_int(date): return datetime.datetime.fromtimestamp(int(date)).replace(tzinfo=self._timezone) else: - return datetime.datetime.strptime(date, self._datetime_format).replace(tzinfo=self._timezone) + return dateutil.parser.parse(date).replace(tzinfo=self._timezone) elif isinstance(date, int): return datetime.datetime.fromtimestamp(int(date)).replace(tzinfo=self._timezone) return date @@ -115,6 +203,27 @@ def _parse_timedelta(cls, time_str): time_params = {name: float(param) for name, param in parts.groupdict().items() if param} return datetime.timedelta(**time_params) - @staticmethod - def _is_start_date_valid(start_date: datetime, end_date: datetime) -> bool: - return start_date <= end_date + def request_params(self) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.request_parameter) + + def request_headers(self) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.header) + + def request_body_data(self) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.body_data) + + def request_body_json(self) -> Mapping[str, Any]: + return self._get_request_options(RequestOptionType.body_json) + + def request_kwargs(self) -> Mapping[str, Any]: + # Never update kwargs + return {} + + def _get_request_options(self, option_type): + options = {} + if self._start_time_option and self._start_time_option.inject_into == option_type: + if self._cursor: + options[self._start_time_option.field_name] = self._cursor + if self._end_time_option and self._end_time_option.inject_into == option_type: + options[self._end_time_option.field_name] = self._cursor_end + return options diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py index 58f744638abbad..1466ba1650e152 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py @@ -3,12 +3,13 @@ # import ast -from typing import Any, Iterable, List, Mapping, Union +from typing import Any, Iterable, List, Mapping, Optional, Union from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer -from airbyte_cdk.sources.declarative.types import Config +from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState class ListStreamSlicer(StreamSlicer): @@ -17,13 +18,61 @@ class ListStreamSlicer(StreamSlicer): If slice_values is a string, then evaluate it as literal and assert the resulting literal is a list """ - def __init__(self, slice_values: Union[str, List[str]], slice_definition: Mapping[str, Any], config: Config): + def __init__( + self, + slice_values: Union[str, List[str]], + cursor_field: Union[InterpolatedString, str], + config: Config, + request_option: Optional[RequestOption] = None, + ): + """ + :param slice_values: The values to iterate over + :param cursor_field: The name of the cursor field + :param config: The user-provided configuration as specified by the source's spec + :param request_option: The request option to configure the HTTP request + """ if isinstance(slice_values, str): slice_values = ast.literal_eval(slice_values) - assert isinstance(slice_values, list) - self._interpolation = InterpolatedMapping(slice_definition) + if isinstance(cursor_field, str): + cursor_field = InterpolatedString(cursor_field) + self._cursor_field = cursor_field self._slice_values = slice_values self._config = config + self._cursor = None + self._request_option = request_option + + if request_option and request_option.inject_into == RequestOptionType.path: + raise ValueError("Slice value cannot be injected in the path") + + def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None): + slice_value = stream_slice.get(self._cursor_field.eval(self._config)) + if slice_value and slice_value in self._slice_values: + self._cursor = slice_value + + def get_stream_state(self) -> StreamState: + return {self._cursor_field.eval(self._config): self._cursor} if self._cursor else {} + + def request_params(self) -> Mapping[str, Any]: + return self._get_request_option(RequestOptionType.request_parameter) + + def request_headers(self) -> Mapping[str, Any]: + return self._get_request_option(RequestOptionType.header) + + def request_body_data(self) -> Mapping[str, Any]: + return self._get_request_option(RequestOptionType.body_data) + + def request_body_json(self) -> Mapping[str, Any]: + return self._get_request_option(RequestOptionType.body_json) + + def request_kwargs(self) -> Mapping[str, Any]: + # Never update kwargs + return {} def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: - return [self._interpolation.eval(self._config, slice_value=slice_value) for slice_value in self._slice_values] + return [{self._cursor_field.eval(self._config): slice_value} for slice_value in self._slice_values] + + def _get_request_option(self, request_option_type: RequestOptionType): + if self._request_option and self._request_option.inject_into == request_option_type: + return {self._request_option.field_name: self._cursor} + else: + return {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py index 28b1ccee19a439..a7571a6afb518f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/single_slice.py @@ -2,15 +2,37 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, Iterable, Mapping +from typing import Any, Iterable, Mapping, Optional from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer +from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState class SingleSlice(StreamSlicer): - def __init__(self, **kwargs): + """Stream slicer returning only a single stream slice""" + + def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None): pass - def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: + def get_stream_state(self) -> StreamState: + return {} + + def request_params(self) -> Mapping[str, Any]: + return {} + + def request_headers(self) -> Mapping[str, Any]: + return {} + + def request_body_data(self) -> Mapping[str, Any]: + return {} + + def request_body_json(self) -> Mapping[str, Any]: + return {} + + def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[StreamSlice]: return [dict()] + + def request_kwargs(self) -> Mapping[str, Any]: + # Never update kwargs + return {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py index 4ad645d85a4eb3..025aa2bf1556e9 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py @@ -2,13 +2,43 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from abc import ABC, abstractmethod -from typing import Any, Iterable, Mapping, Optional +from abc import abstractmethod +from typing import Iterable, Optional from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider +from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState -class StreamSlicer(ABC): +class StreamSlicer(RequestOptionsProvider): + """ + Slices the stream into a subset of records. + Slices enable state checkpointing and data retrieval parallelization. + + The stream slicer keeps track of the cursor state as a dict of cursor_field -> cursor_value + + See the stream slicing section of the docs for more information. + """ + + @abstractmethod + def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Iterable[StreamSlice]: + """ + Defines stream slices + + :param sync_mode: The sync mode used the read data + :param stream_state: The current stream state + :return: List of stream slices + """ + + @abstractmethod + def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None): + """ + State setter, accept state serialized by state getter. + + :param stream_slice: Current stream_slice + :param last_record: Last record read from the source + """ + @abstractmethod - def stream_slices(self, sync_mode: SyncMode, stream_state: Optional[Mapping[str, Any]]) -> Iterable[Mapping[str, Any]]: - pass + def get_stream_state(self) -> StreamState: + """Returns the current stream state""" diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py index 7ef01197c523a5..e8eac15970ecba 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py @@ -2,31 +2,94 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from typing import Any, Iterable, List, Mapping +from dataclasses import dataclass +from typing import Any, Iterable, List, Mapping, Optional from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping -from airbyte_cdk.sources.declarative.states.dict_state import DictState +from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer +from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState from airbyte_cdk.sources.streams.core import Stream +@dataclass +class ParentStreamConfig: + """ + Describes how to create a stream slice from a parent stream + + stream: The stream to read records from + parent_key: The key of the parent stream's records that will be the stream slice key + stream_slice_field: The stream slice key + request_option: How to inject the slice value on an outgoing HTTP request + """ + + stream: Stream + parent_key: str + stream_slice_field: str + request_option: Optional[RequestOption] = None + + class SubstreamSlicer(StreamSlicer): """ Stream slicer that iterates over the parent's stream slices and records and emits slices by interpolating the slice_definition mapping Will populate the state with `parent_stream_slice` and `parent_record` so they can be accessed by other components """ - def __init__(self, parent_streams: List[Stream], state: DictState, slice_definition: Mapping[str, Any]): - self._parent_streams = parent_streams - self._state = state - self._interpolation = InterpolatedMapping(slice_definition) + def __init__( + self, + parent_streams_configs: List[ParentStreamConfig], + ): + """ + :param parent_streams_configs: parent streams to iterate over and their config + """ + if not parent_streams_configs: + raise ValueError("SubstreamSlicer needs at least 1 parent stream") + self._parent_stream_configs = parent_streams_configs + self._cursor = None + + def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None): + cursor = {} + for parent_stream_config in self._parent_stream_configs: + slice_value = stream_slice.get(parent_stream_config.stream_slice_field) + if slice_value: + cursor.update({parent_stream_config.stream_slice_field: slice_value}) + self._cursor = cursor - def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: + def request_params(self) -> Mapping[str, Any]: + return self._get_request_option(RequestOptionType.request_parameter) + + def request_headers(self) -> Mapping[str, Any]: + return self._get_request_option(RequestOptionType.header) + + def request_body_data(self) -> Mapping[str, Any]: + return self._get_request_option(RequestOptionType.body_data) + + def request_body_json(self) -> Optional[Mapping]: + return self._get_request_option(RequestOptionType.body_json) + + def request_kwargs(self) -> Mapping[str, Any]: + # Never update kwargs + return {} + + def _get_request_option(self, option_type: RequestOptionType): + params = {} + for parent_config in self._parent_stream_configs: + if parent_config.request_option and parent_config.request_option.inject_into == option_type: + key = parent_config.stream_slice_field + value = self._cursor.get(key) + if value: + params.update({key: value}) + return params + + def get_stream_state(self) -> StreamState: + return self._cursor if self._cursor else {} + + def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Iterable[StreamSlice]: """ - Iterate over each parent stream. + Iterate over each parent stream's record and create a StreamSlice for each record. + For each stream, iterate over its stream_slices. - For each stream slice, iterate over each records. + For each stream slice, iterate over each record. yield a stream slice for each such records. If a parent slice contains no record, emit a slice with parent_record=None. @@ -36,29 +99,24 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> - parent_record: mapping representing the parent record - parent_stream_name: string representing the parent stream name """ - if not self._parent_streams: + if not self._parent_stream_configs: yield from [] else: - for parent_stream in self._parent_streams: + for parent_stream_config in self._parent_stream_configs: + parent_stream = parent_stream_config.stream + parent_field = parent_stream_config.parent_key + stream_state_field = parent_stream_config.stream_slice_field for parent_stream_slice in parent_stream.stream_slices(sync_mode=sync_mode, cursor_field=None, stream_state=stream_state): - self._state.update_state(parent_stream_slice=parent_stream_slice) - self._state.update_state(parent_record=None) empty_parent_slice = True + parent_slice = parent_stream_slice.get("slice") for parent_record in parent_stream.read_records( sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None ): empty_parent_slice = False - slice_definition = self._get_slice_definition(parent_stream_slice, parent_record, parent_stream.name) - self._state.update_state(parent_record=parent_record) - yield slice_definition + stream_state_value = parent_record.get(parent_field) + yield {stream_state_field: stream_state_value, "parent_slice": parent_slice} # If the parent slice contains no records, - # yield a slice definition with parent_record==None if empty_parent_slice: - slice_definition = self._get_slice_definition(parent_stream_slice, None, parent_stream.name) - yield slice_definition - - def _get_slice_definition(self, parent_stream_slice, parent_record, parent_stream_name): - return self._interpolation.eval( - None, parent_stream_slice=parent_stream_slice, parent_record=parent_record, parent_stream_name=parent_stream_name - ) + stream_state_value = parent_stream_slice.get(parent_field) + yield {stream_state_field: stream_state_value, "parent_slice": parent_slice} diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_dict_state.py b/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_dict_state.py deleted file mode 100644 index 38eb43ce9f3594..00000000000000 --- a/airbyte-cdk/python/unit_tests/sources/declarative/requesters/test_dict_state.py +++ /dev/null @@ -1,85 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from airbyte_cdk.sources.declarative.states.dict_state import DictState - -config = {"name": "date"} -name = "{{ config['name'] }}" -value = "{{ last_record['updated_at'] }}" -dict_mapping = { - name: value, -} - - -def test_empty_state_is_none(): - state = DictState(dict_mapping, config) - initial_state = state.get_stream_state() - expected_state = {} - assert expected_state == initial_state - - -def test_update_initial_state(): - state = DictState(dict_mapping, config) - stream_slice = None - stream_state = None - last_response = {"data": {"id": "1234", "updated_at": "2021-01-01"}, "last_refresh": "2020-01-01"} - last_record = {"id": "1234", "updated_at": "2021-01-01"} - state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record) - actual_state = state.get_stream_state() - expected_state = {"date": "2021-01-01"} - assert expected_state == actual_state - - -def test_update_state_with_recent_cursor(): - state = DictState(dict_mapping, config) - stream_slice = None - stream_state = {"date": "2020-12-31"} - last_response = {"data": {"id": "1234", "updated_at": "2021-01-01"}, "last_refresh": "2020-01-01"} - last_record = {"id": "1234", "updated_at": "2021-01-01"} - state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record) - actual_state = state.get_stream_state() - expected_state = {"date": "2021-01-01"} - assert expected_state == actual_state - - -def test_update_state_with_old_cursor(): - state = DictState(dict_mapping, config) - stream_slice = None - stream_state = {"date": "2021-01-02"} - last_response = {"data": {"id": "1234", "updated_at": "2021-01-01"}, "last_refresh": "2020-01-01"} - last_record = {"id": "1234", "updated_at": "2021-01-01"} - state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record) - actual_state = state.get_stream_state() - expected_state = {"date": "2021-01-02"} - assert expected_state == actual_state - - -def test_update_state_with_older_state(): - state = DictState(dict_mapping, config) - stream_slice = None - stream_state = {"date": "2021-01-02"} - last_response = {"data": {"id": "1234", "updated_at": "2021-01-02"}, "last_refresh": "2020-01-01"} - last_record = {"id": "1234", "updated_at": "2021-01-02"} - state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record) - actual_state = state.get_stream_state() - expected_state = {"date": "2021-01-02"} - - out_of_order_response = {"data": {"id": "1234", "updated_at": "2021-01-02"}, "last_refresh": "2020-01-01"} - out_of_order_record = {"id": "1234", "updated_at": "2021-01-01"} - state.update_state( - stream_slice=stream_slice, stream_state=stream_state, last_response=out_of_order_response, last_record=out_of_order_record - ) - assert expected_state == actual_state - - -def test_state_is_a_timestamp(): - state = DictState(dict_mapping, config) - stream_slice = None - stream_state = {"date": 12345} - last_response = {"data": {"id": "1234", "updated_at": 123456}, "last_refresh": "2020-01-01"} - last_record = {"id": "1234", "updated_at": 123456} - state.update_state(stream_slice=stream_slice, stream_state=stream_state, last_response=last_response, last_record=last_record) - actual_state = state.get_stream_state() - expected_state = {"date": 123456} - assert expected_state == actual_state diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 5d8baa18044836..ca994f8a3d6a25 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -17,9 +17,10 @@ primary_key = "pk" records = [{"id": 1}, {"id": 2}] +config = {} -def test_simple_retriever(): +def test_simple_retriever_full(): requester = MagicMock() request_params = {"param": "value"} requester.request_params.return_value = request_params @@ -38,9 +39,8 @@ def test_simple_retriever(): response = requests.Response() - state = MagicMock() underlying_state = {"date": "2021-01-01"} - state.get_stream_state.return_value = underlying_state + iterator.get_stream_state.return_value = underlying_state url_base = "https://airbyte.io" requester.get_url_base.return_value = url_base @@ -69,12 +69,8 @@ def test_simple_retriever(): paginator=paginator, record_selector=record_selector, stream_slicer=iterator, - state=state, ) - # hack because we clone the state... - retriever._state = state - assert retriever.primary_key == primary_key assert retriever.url_base == url_base assert retriever.path() == path diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py index f0cf093f6abf7f..00babbb382c832 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py @@ -6,6 +6,7 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer @@ -16,14 +17,14 @@ [ ( "test_single_stream_slicer", - [ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None)], + [ListStreamSlicer(["customer", "store", "subscription"], "owner_resource", None)], [{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}], ), ( "test_two_stream_slicers", [ - ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None), - ListStreamSlicer(["A", "B"], {"letter": "{{ slice_value }}"}, None), + ListStreamSlicer(["customer", "store", "subscription"], "owner_resource", None), + ListStreamSlicer(["A", "B"], "letter", None), ], [ {"owner_resource": "customer", "letter": "A"}, @@ -37,7 +38,7 @@ ( "test_list_and_datetime", [ - ListStreamSlicer(["customer", "store", "subscription"], {"owner_resource": "{{ slice_value }}"}, None), + ListStreamSlicer(["customer", "store", "subscription"], "owner_resource", None), DatetimeStreamSlicer( MinMaxDatetime(datetime="2021-01-01", datetime_format="%Y-%m-%d"), MinMaxDatetime(datetime="2021-01-03", datetime_format="%Y-%m-%d"), @@ -65,3 +66,97 @@ def test_substream_slicer(test_name, stream_slicers, expected_slices): slicer = CartesianProductStreamSlicer(stream_slicers) slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)] assert slices == expected_slices + + +@pytest.mark.parametrize( + "test_name, stream_slice, expected_state", + [ + ("test_update_cursor_no_state_no_record", {}, {}), + ("test_update_cursor_partial_state", {"owner_resource": "customer"}, {"owner_resource": "customer"}), + ( + "test_update_cursor_full_state", + {"owner_resource": "customer", "date": "2021-01-01"}, + {"owner_resource": "customer", "date": "2021-01-01"}, + ), + ], +) +def test_update_cursor(test_name, stream_slice, expected_state): + stream_slicers = [ + ListStreamSlicer(["customer", "store", "subscription"], "owner_resource", None), + DatetimeStreamSlicer( + MinMaxDatetime(datetime="2021-01-01", datetime_format="%Y-%m-%d"), + MinMaxDatetime(datetime="2021-01-03", datetime_format="%Y-%m-%d"), + "1d", + InterpolatedString("date"), + "%Y-%m-%d", + None, + ), + ] + slicer = CartesianProductStreamSlicer(stream_slicers) + slicer.update_cursor(stream_slice, None) + updated_state = slicer.get_stream_state() + assert expected_state == updated_state + + +@pytest.mark.parametrize( + "test_name, stream_1_request_option, stream_2_request_option, expected_req_params, expected_headers,expected_body_json, expected_body_data", + [ + ( + "test_param_header", + RequestOption(RequestOptionType.request_parameter, "owner"), + RequestOption(RequestOptionType.header, "repo"), + {"owner": "customer"}, + {"repo": "airbyte"}, + {}, + {}, + ), + ( + "test_header_header", + RequestOption(RequestOptionType.header, "owner"), + RequestOption(RequestOptionType.header, "repo"), + {}, + {"owner": "customer", "repo": "airbyte"}, + {}, + {}, + ), + ( + "test_body_data", + RequestOption(RequestOptionType.body_data, "owner"), + RequestOption(RequestOptionType.body_data, "repo"), + {}, + {}, + {}, + {"owner": "customer", "repo": "airbyte"}, + ), + ( + "test_body_json", + RequestOption(RequestOptionType.body_json, "owner"), + RequestOption(RequestOptionType.body_json, "repo"), + {}, + {}, + {"owner": "customer", "repo": "airbyte"}, + {}, + ), + ], +) +def test_request_option( + test_name, + stream_1_request_option, + stream_2_request_option, + expected_req_params, + expected_headers, + expected_body_json, + expected_body_data, +): + slicer = CartesianProductStreamSlicer( + [ + ListStreamSlicer(["customer", "store", "subscription"], "owner_resource", None, stream_1_request_option), + ListStreamSlicer(["airbyte", "airbyte-cloud"], "repository", None, stream_2_request_option), + ] + ) + slicer.update_cursor({"owner_resource": "customer", "repository": "airbyte"}, None) + + assert expected_req_params == slicer.request_params() + assert expected_headers == slicer.request_headers() + assert expected_body_json == slicer.request_body_json() + assert expected_body_data == slicer.request_body_data() diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py index 0f34bc88cbf016..c34f4ada975d1f 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_datetime_stream_slicer.py @@ -9,16 +9,17 @@ from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer -datetime_format = "%Y-%m-%d" +datetime_format = "%Y-%m-%dT%H:%M:%S.%f%z" FAKE_NOW = datetime.datetime(2022, 1, 1, tzinfo=datetime.timezone.utc) -config = {"start_date": "2021-01-01"} +config = {"start_date": "2021-01-01T00:00:00.000000+0000", "start_date_ymd": "2021-01-01"} end_date_now = InterpolatedString( "{{ today_utc() }}", ) -cursor_value = InterpolatedString("{{ stream_state['date'] }}") +cursor_field = "created" timezone = datetime.timezone.utc @@ -30,161 +31,185 @@ def mock_datetime_now(monkeypatch): @pytest.mark.parametrize( - "test_name, stream_state, start, end, step, cursor, lookback_window, expected_slices", + "test_name, stream_state, start, end, step, cursor_field, lookback_window, datetime_format, expected_slices", [ ( "test_1_day", None, - MinMaxDatetime("{{ config['start_date'] }}", datetime_format=datetime_format), - MinMaxDatetime("2021-01-10", datetime_format=datetime_format), + MinMaxDatetime("{{ config['start_date'] }}"), + MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), "1d", - cursor_value, + cursor_field, None, + datetime_format, [ - {"start_date": "2021-01-01", "end_date": "2021-01-01"}, - {"start_date": "2021-01-02", "end_date": "2021-01-02"}, - {"start_date": "2021-01-03", "end_date": "2021-01-03"}, - {"start_date": "2021-01-04", "end_date": "2021-01-04"}, - {"start_date": "2021-01-05", "end_date": "2021-01-05"}, - {"start_date": "2021-01-06", "end_date": "2021-01-06"}, - {"start_date": "2021-01-07", "end_date": "2021-01-07"}, - {"start_date": "2021-01-08", "end_date": "2021-01-08"}, - {"start_date": "2021-01-09", "end_date": "2021-01-09"}, - {"start_date": "2021-01-10", "end_date": "2021-01-10"}, + {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"}, + {"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, + {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"}, + {"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, + {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, + {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, + {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, + {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, + {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, + {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, ], ), ( "test_2_day", None, - MinMaxDatetime("{{ config['start_date'] }}", datetime_format=datetime_format), - MinMaxDatetime("2021-01-10", datetime_format=datetime_format), + MinMaxDatetime("{{ config['start_date'] }}"), + MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), "2d", - cursor_value, + cursor_field, None, + datetime_format, [ - {"start_date": "2021-01-01", "end_date": "2021-01-02"}, - {"start_date": "2021-01-03", "end_date": "2021-01-04"}, - {"start_date": "2021-01-05", "end_date": "2021-01-06"}, - {"start_date": "2021-01-07", "end_date": "2021-01-08"}, - {"start_date": "2021-01-09", "end_date": "2021-01-10"}, + {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, + {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, + {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, + {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, + {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, ], ), ( "test_from_stream_state", - {"date": "2021-01-05"}, - MinMaxDatetime("{{ stream_state['date'] }}", datetime_format=datetime_format), - MinMaxDatetime("2021-01-10", datetime_format=datetime_format), + {"date": "2021-01-05T00:00:00.000000+0000"}, + MinMaxDatetime("{{ stream_state['date'] }}"), + MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), "1d", - cursor_value, + cursor_field, None, + datetime_format, [ - # FIXME: should this include 2021-01-05? - {"start_date": "2021-01-05", "end_date": "2021-01-05"}, - {"start_date": "2021-01-06", "end_date": "2021-01-06"}, - {"start_date": "2021-01-07", "end_date": "2021-01-07"}, - {"start_date": "2021-01-08", "end_date": "2021-01-08"}, - {"start_date": "2021-01-09", "end_date": "2021-01-09"}, - {"start_date": "2021-01-10", "end_date": "2021-01-10"}, + {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, + {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, + {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, + {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, + {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, + {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, ], ), ( "test_12_day", None, - MinMaxDatetime("{{ config['start_date'] }}", datetime_format=datetime_format), - MinMaxDatetime("2021-01-10", datetime_format=datetime_format), + MinMaxDatetime("{{ config['start_date'] }}"), + MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), "12d", - cursor_value, + cursor_field, None, + datetime_format, [ - {"start_date": "2021-01-01", "end_date": "2021-01-10"}, + {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, ], ), ( "test_end_date_greater_than_now", None, - MinMaxDatetime("2021-12-28", datetime_format=datetime_format), - MinMaxDatetime(f"{(FAKE_NOW + datetime.timedelta(days=1)).strftime(datetime_format)}", datetime_format=datetime_format), + MinMaxDatetime("2021-12-28T00:00:00.000000+0000"), + MinMaxDatetime(f"{(FAKE_NOW + datetime.timedelta(days=1)).strftime(datetime_format)}"), "1d", - cursor_value, + cursor_field, None, + datetime_format, [ - {"start_date": "2021-12-28", "end_date": "2021-12-28"}, - {"start_date": "2021-12-29", "end_date": "2021-12-29"}, - {"start_date": "2021-12-30", "end_date": "2021-12-30"}, - {"start_date": "2021-12-31", "end_date": "2021-12-31"}, - {"start_date": "2022-01-01", "end_date": "2022-01-01"}, + {"start_date": "2021-12-28T00:00:00.000000+0000", "end_date": "2021-12-28T00:00:00.000000+0000"}, + {"start_date": "2021-12-29T00:00:00.000000+0000", "end_date": "2021-12-29T00:00:00.000000+0000"}, + {"start_date": "2021-12-30T00:00:00.000000+0000", "end_date": "2021-12-30T00:00:00.000000+0000"}, + {"start_date": "2021-12-31T00:00:00.000000+0000", "end_date": "2021-12-31T00:00:00.000000+0000"}, + {"start_date": "2022-01-01T00:00:00.000000+0000", "end_date": "2022-01-01T00:00:00.000000+0000"}, ], ), ( "test_start_date_greater_than_end_date", - {"date": "2021-01-05"}, - MinMaxDatetime("2021-01-10", datetime_format=datetime_format), - MinMaxDatetime("{{ stream_state['date'] }}", datetime_format=datetime_format), + None, + MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), + MinMaxDatetime("2021-01-05T00:00:00.000000+0000"), "1d", - cursor_value, + cursor_field, None, + datetime_format, [ - {"start_date": "2021-01-05", "end_date": "2021-01-05"}, + {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, ], ), ( "test_cursor_date_greater_than_start_date", - {"date": "2021-01-05"}, - MinMaxDatetime("{{ stream_state['date'] }}", datetime_format=datetime_format), - MinMaxDatetime("2021-01-10", datetime_format=datetime_format), + {"date": "2021-01-05T00:00:00.000000+0000"}, + MinMaxDatetime("{{ stream_state['date'] }}"), + MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), "1d", InterpolatedString("{{ stream_state['date'] }}"), None, + datetime_format, [ - {"start_date": "2021-01-05", "end_date": "2021-01-05"}, - {"start_date": "2021-01-06", "end_date": "2021-01-06"}, - {"start_date": "2021-01-07", "end_date": "2021-01-07"}, - {"start_date": "2021-01-08", "end_date": "2021-01-08"}, - {"start_date": "2021-01-09", "end_date": "2021-01-09"}, - {"start_date": "2021-01-10", "end_date": "2021-01-10"}, + {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, + {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, + {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, + {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, + {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, + {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, + ], + ), + ( + "test_cursor_date_greater_than_start_date_multiday_step", + {cursor_field: "2021-01-05T00:00:00.000000+0000"}, + MinMaxDatetime("2021-01-03T00:00:00.000000+0000"), + MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), + "2d", + cursor_field, + None, + datetime_format, + [ + {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, + {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, + {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, ], ), ( "test_start_date_less_than_min_date", - {"date": "2021-01-05"}, - MinMaxDatetime("{{ config['start_date'] }}", min_datetime="{{ stream_state['date'] }}", datetime_format=datetime_format), - MinMaxDatetime("2021-01-10", datetime_format=datetime_format), + {"date": "2021-01-05T00:00:00.000000+0000"}, + MinMaxDatetime("{{ config['start_date'] }}", min_datetime="{{ stream_state['date'] }}"), + MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), "1d", InterpolatedString("{{ stream_state['date'] }}"), None, + datetime_format, [ - {"start_date": "2021-01-05", "end_date": "2021-01-05"}, - {"start_date": "2021-01-06", "end_date": "2021-01-06"}, - {"start_date": "2021-01-07", "end_date": "2021-01-07"}, - {"start_date": "2021-01-08", "end_date": "2021-01-08"}, - {"start_date": "2021-01-09", "end_date": "2021-01-09"}, - {"start_date": "2021-01-10", "end_date": "2021-01-10"}, + {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, + {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, + {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, + {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, + {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, + {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, ], ), ( "test_end_date_greater_than_max_date", - {"date": "2021-01-05"}, - MinMaxDatetime("{{ config['start_date'] }}", datetime_format=datetime_format), - MinMaxDatetime("2021-01-10", max_datetime="{{ stream_state['date'] }}", datetime_format=datetime_format), + {"date": "2021-01-05T00:00:00.000000+0000"}, + MinMaxDatetime("{{ config['start_date'] }}"), + MinMaxDatetime("2021-01-10T00:00:00.000000+0000", max_datetime="{{ stream_state['date'] }}"), "1d", + cursor_field, None, - None, + datetime_format, [ - {"start_date": "2021-01-01", "end_date": "2021-01-01"}, - {"start_date": "2021-01-02", "end_date": "2021-01-02"}, - {"start_date": "2021-01-03", "end_date": "2021-01-03"}, - {"start_date": "2021-01-04", "end_date": "2021-01-04"}, - {"start_date": "2021-01-05", "end_date": "2021-01-05"}, + {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"}, + {"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, + {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"}, + {"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, + {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, ], ), ( "test_start_end_min_max_inherits_datetime_format_from_stream_slicer", {"date": "2021-01-05"}, - MinMaxDatetime("{{ config['start_date'] }}"), + MinMaxDatetime("{{ config['start_date_ymd'] }}"), MinMaxDatetime("2021-01-10", max_datetime="{{ stream_state['date'] }}"), "1d", + cursor_field, None, - None, + "%Y-%m-%d", [ {"start_date": "2021-01-01", "end_date": "2021-01-01"}, {"start_date": "2021-01-02", "end_date": "2021-01-02"}, @@ -197,46 +222,67 @@ def mock_datetime_now(monkeypatch): "test_with_lookback_window_from_start_date", {"date": "2021-01-05"}, MinMaxDatetime("{{ config['start_date'] }}"), - MinMaxDatetime("2021-01-10", max_datetime="{{ stream_state['date'] }}"), + MinMaxDatetime("2021-01-10", max_datetime="{{ stream_state['date'] }}", datetime_format="%Y-%m-%d"), "1d", - None, + cursor_field, "3d", + datetime_format, [ - {"start_date": "2020-12-29", "end_date": "2020-12-29"}, - {"start_date": "2020-12-30", "end_date": "2020-12-30"}, - {"start_date": "2020-12-31", "end_date": "2020-12-31"}, - {"start_date": "2021-01-01", "end_date": "2021-01-01"}, - {"start_date": "2021-01-02", "end_date": "2021-01-02"}, - {"start_date": "2021-01-03", "end_date": "2021-01-03"}, - {"start_date": "2021-01-04", "end_date": "2021-01-04"}, - {"start_date": "2021-01-05", "end_date": "2021-01-05"}, + {"start_date": "2020-12-29T00:00:00.000000+0000", "end_date": "2020-12-29T00:00:00.000000+0000"}, + {"start_date": "2020-12-30T00:00:00.000000+0000", "end_date": "2020-12-30T00:00:00.000000+0000"}, + {"start_date": "2020-12-31T00:00:00.000000+0000", "end_date": "2020-12-31T00:00:00.000000+0000"}, + {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"}, + {"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, + {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"}, + {"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, + {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, ], ), ( "test_with_lookback_window_defaults_to_0d", {"date": "2021-01-05"}, MinMaxDatetime("{{ config['start_date'] }}"), - MinMaxDatetime("2021-01-10", max_datetime="{{ stream_state['date'] }}"), + MinMaxDatetime("2021-01-10", max_datetime="{{ stream_state['date'] }}", datetime_format="%Y-%m-%d"), "1d", - None, + cursor_field, "{{ config['does_not_exist'] }}", + datetime_format, [ - {"start_date": "2021-01-01", "end_date": "2021-01-01"}, - {"start_date": "2021-01-02", "end_date": "2021-01-02"}, - {"start_date": "2021-01-03", "end_date": "2021-01-03"}, - {"start_date": "2021-01-04", "end_date": "2021-01-04"}, - {"start_date": "2021-01-05", "end_date": "2021-01-05"}, + {"start_date": "2021-01-01T00:00:00.000000+0000", "end_date": "2021-01-01T00:00:00.000000+0000"}, + {"start_date": "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-02T00:00:00.000000+0000"}, + {"start_date": "2021-01-03T00:00:00.000000+0000", "end_date": "2021-01-03T00:00:00.000000+0000"}, + {"start_date": "2021-01-04T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"}, + {"start_date": "2021-01-05T00:00:00.000000+0000", "end_date": "2021-01-05T00:00:00.000000+0000"}, + ], + ), + ( + "test_start_is_after_stream_state", + {cursor_field: "2021-01-05T00:00:00.000000+0000"}, + MinMaxDatetime("2021-01-01T00:00:00.000000+0000"), + MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), + "1d", + cursor_field, + None, + datetime_format, + [ + {"start_date": "2021-01-06T00:00:00.000000+0000", "end_date": "2021-01-06T00:00:00.000000+0000"}, + {"start_date": "2021-01-07T00:00:00.000000+0000", "end_date": "2021-01-07T00:00:00.000000+0000"}, + {"start_date": "2021-01-08T00:00:00.000000+0000", "end_date": "2021-01-08T00:00:00.000000+0000"}, + {"start_date": "2021-01-09T00:00:00.000000+0000", "end_date": "2021-01-09T00:00:00.000000+0000"}, + {"start_date": "2021-01-10T00:00:00.000000+0000", "end_date": "2021-01-10T00:00:00.000000+0000"}, ], ), ], ) -def test_stream_slices(mock_datetime_now, test_name, stream_state, start, end, cursor, step, lookback_window, expected_slices): +def test_stream_slices( + mock_datetime_now, test_name, stream_state, start, end, step, cursor_field, lookback_window, datetime_format, expected_slices +): lookback_window = InterpolatedString(lookback_window) if lookback_window else None slicer = DatetimeStreamSlicer( start_datetime=start, end_datetime=end, step=step, - cursor_value=cursor, + cursor_field=cursor_field, datetime_format=datetime_format, lookback_window=lookback_window, config=config, @@ -246,5 +292,156 @@ def test_stream_slices(mock_datetime_now, test_name, stream_state, start, end, c assert expected_slices == stream_slices +@pytest.mark.parametrize( + "test_name, previous_cursor, stream_slice, last_record, expected_state", + [ + ("test_update_cursor_no_state_no_record", None, {}, None, {}), + ( + "test_update_cursor_with_state_no_record", + None, + {cursor_field: "2021-01-02T00:00:00.000000+0000"}, + None, + {cursor_field: "2021-01-02T00:00:00.000000+0000"}, + ), + ( + "test_update_cursor_with_state_equals_record", + None, + {cursor_field: "2021-01-02T00:00:00.000000+0000"}, + {cursor_field: "2021-01-02T00:00:00.000000+0000"}, + {cursor_field: "2021-01-02T00:00:00.000000+0000"}, + ), + ( + "test_update_cursor_with_state_greater_than_record", + None, + {cursor_field: "2021-01-03T00:00:00.000000+0000"}, + {cursor_field: "2021-01-02T00:00:00.000000+0000"}, + {cursor_field: "2021-01-03T00:00:00.000000+0000"}, + ), + ( + "test_update_cursor_with_state_less_than_record", + None, + {cursor_field: "2021-01-02T00:00:00.000000+0000"}, + {cursor_field: "2021-01-03T00:00:00.000000+0000"}, + {cursor_field: "2021-01-03T00:00:00.000000+0000"}, + ), + ( + "test_update_cursor_with_state_less_than_previous_cursor", + "2021-01-03T00:00:00.000000+0000", + {cursor_field: "2021-01-02T00:00:00.000000+0000"}, + {}, + {cursor_field: "2021-01-03T00:00:00.000000+0000"}, + ), + ], +) +def test_update_cursor(test_name, previous_cursor, stream_slice, last_record, expected_state): + slicer = DatetimeStreamSlicer( + start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000"), + end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), + step="1d", + cursor_field=InterpolatedString(cursor_field), + datetime_format=datetime_format, + lookback_window=InterpolatedString("0d"), + config=config, + ) + slicer._cursor = previous_cursor + slicer.update_cursor(stream_slice, last_record) + updated_state = slicer.get_stream_state() + assert expected_state == updated_state + + +@pytest.mark.parametrize( + "test_name, inject_into, field_name, expected_req_params, expected_headers, expected_body_json, expected_body_data", + [ + ("test_start_time_inject_into_none", None, None, {}, {}, {}, {}), + ( + "test_start_time_passed_by_req_param", + RequestOptionType.request_parameter, + "start_time", + {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + {}, + {}, + {}, + ), + ( + "test_start_time_inject_into_header", + RequestOptionType.header, + "start_time", + {}, + {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + {}, + {}, + ), + ( + "test_start_time_inject_intoy_body_json", + RequestOptionType.body_json, + "start_time", + {}, + {}, + {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + {}, + ), + ( + "test_start_time_inject_into_body_data", + RequestOptionType.body_data, + "start_time", + {}, + {}, + {}, + {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + ), + ( + "test_start_time_inject_into_path", + RequestOptionType.path, + "start_time", + {}, + {}, + {}, + {"start_time": "2021-01-02T00:00:00.000000+0000", "endtime": "2021-01-04T00:00:00.000000+0000"}, + ), + ], +) +def test_request_option(test_name, inject_into, field_name, expected_req_params, expected_headers, expected_body_json, expected_body_data): + if inject_into == RequestOptionType.path: + start_request_option = RequestOption(inject_into) + end_request_option = RequestOption(inject_into) + try: + DatetimeStreamSlicer( + start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000"), + end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), + step="1d", + cursor_field=InterpolatedString(cursor_field), + datetime_format=datetime_format, + lookback_window=InterpolatedString("0d"), + start_time_option=start_request_option, + end_time_option=end_request_option, + config=config, + ) + assert False + except ValueError: + return + else: + start_request_option = RequestOption(inject_into, field_name) if inject_into else None + end_request_option = RequestOption(inject_into, "endtime") if inject_into else None + slicer = DatetimeStreamSlicer( + start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000"), + end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000"), + step="1d", + cursor_field=InterpolatedString(cursor_field), + datetime_format=datetime_format, + lookback_window=InterpolatedString("0d"), + start_time_option=start_request_option, + end_time_option=end_request_option, + config=config, + ) + stream_slice = {cursor_field: "2021-01-02T00:00:00.000000+0000", "end_date": "2021-01-04T00:00:00.000000+0000"} + + slicer.update_cursor(stream_slice) + + assert expected_req_params == slicer.request_params() + assert expected_headers == slicer.request_headers() + assert expected_body_json == slicer.request_body_json() + assert expected_body_data == slicer.request_body_data() + + if __name__ == "__main__": unittest.main() diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py index 8f706687e95827..ba94a1d6a94563 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_slicer.py @@ -4,21 +4,101 @@ import pytest as pytest from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer +slice_values = ["customer", "store", "subscription"] +cursor_field = "owner_resource" + @pytest.mark.parametrize( - "test_name, slice_values, slice_definition, expected_slices", + "test_name, slice_values, cursor_field, expected_slices", [ ( "test_single_element", ["customer", "store", "subscription"], - {"owner_resource": "{{ slice_value }}"}, + "owner_resource", + [{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}], + ), + ( + "test_input_list_is_string", + '["customer", "store", "subscription"]', + "owner_resource", [{"owner_resource": "customer"}, {"owner_resource": "store"}, {"owner_resource": "subscription"}], ), ], ) -def test_list_slicer(test_name, slice_values, slice_definition, expected_slices): - slicer = ListStreamSlicer(slice_values, slice_definition, config={}) +def test_list_slicer(test_name, slice_values, cursor_field, expected_slices): + slicer = ListStreamSlicer(slice_values, cursor_field, config={}) slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)] assert slices == expected_slices + + +@pytest.mark.parametrize( + "test_name, stream_slice, last_record, expected_state", + [ + ("test_update_cursor_no_state_no_record", {}, None, {}), + ("test_update_cursor_with_state_no_record", {"owner_resource": "customer"}, None, {"owner_resource": "customer"}), + ("test_update_cursor_value_not_in_list", {"owner_resource": "invalid"}, None, {}), + ], +) +def test_update_cursor(test_name, stream_slice, last_record, expected_state): + slicer = ListStreamSlicer(slice_values, cursor_field, config={}) + slicer.update_cursor(stream_slice, last_record) + updated_state = slicer.get_stream_state() + assert expected_state == updated_state + + +@pytest.mark.parametrize( + "test_name, request_option, expected_req_params, expected_headers, expected_body_json, expected_body_data", + [ + ( + "test_inject_into_req_param", + RequestOption(RequestOptionType.request_parameter, "owner_resource"), + {"owner_resource": "customer"}, + {}, + {}, + {}, + ), + ("test_pass_by_header", RequestOption(RequestOptionType.header, "owner_resource"), {}, {"owner_resource": "customer"}, {}, {}), + ( + "test_inject_into_body_json", + RequestOption(RequestOptionType.body_json, "owner_resource"), + {}, + {}, + {"owner_resource": "customer"}, + {}, + ), + ( + "test_inject_into_body_data", + RequestOption(RequestOptionType.body_data, "owner_resource"), + {}, + {}, + {}, + {"owner_resource": "customer"}, + ), + ( + "test_inject_into_path", + RequestOption(RequestOptionType.path), + {}, + {}, + {}, + {"owner_resource": "customer"}, + ), + ], +) +def test_request_option(test_name, request_option, expected_req_params, expected_headers, expected_body_json, expected_body_data): + if request_option.inject_into == RequestOptionType.path: + try: + ListStreamSlicer(slice_values, cursor_field, {}, request_option) + assert False + except ValueError: + return + slicer = ListStreamSlicer(slice_values, cursor_field, {}, request_option) + stream_slice = {cursor_field: "customer"} + + slicer.update_cursor(stream_slice) + assert expected_req_params == slicer.request_params() + assert expected_headers == slicer.request_headers() + assert expected_body_json == slicer.request_body_json() + assert expected_body_data == slicer.request_body_data() diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_substream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_substream_slicer.py index a03998eebaa54e..d3e15a5c9fba96 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_substream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_substream_slicer.py @@ -6,8 +6,8 @@ import pytest as pytest from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.declarative.states.dict_state import DictState -from airbyte_cdk.sources.declarative.stream_slicers.substream_slicer import SubstreamSlicer +from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType +from airbyte_cdk.sources.declarative.stream_slicers.substream_slicer import ParentStreamConfig, SubstreamSlicer from airbyte_cdk.sources.streams.core import Stream parent_records = [{"id": 1, "data": "data1"}, {"id": 2, "data": "data2"}] @@ -20,8 +20,6 @@ parent_slices = [{"slice": "first"}, {"slice": "second"}, {"slice": "third"}] second_parent_stream_slice = [{"slice": "second_parent"}] -slice_definition = {"{{ parent_stream_name }}_id": "{{ parent_record['id'] }}", "parent_slice": "{{ parent_stream_slice['slice'] }}"} - class MockStream(Stream): def __init__(self, slices, records, name): @@ -56,24 +54,22 @@ def read_records( @pytest.mark.parametrize( - "test_name, parent_streams, slice_definition, expected_slices", + "test_name, parent_stream_configs, expected_slices", [ + ("test_no_parents", [], None), ( "test_single_parent_slices_no_records", - [MockStream([{}], [], "first_stream")], - slice_definition, + [ParentStreamConfig(MockStream([{}], [], "first_stream"), "id", "first_stream_id")], [{"first_stream_id": None, "parent_slice": None}], ), ( "test_single_parent_slices_with_records", - [MockStream([{}], parent_records, "first_stream")], - slice_definition, + [ParentStreamConfig(MockStream([{}], parent_records, "first_stream"), "id", "first_stream_id")], [{"first_stream_id": 1, "parent_slice": None}, {"first_stream_id": 2, "parent_slice": None}], ), ( "test_with_parent_slices_and_records", - [MockStream(parent_slices, all_parent_data, "first_stream")], - slice_definition, + [ParentStreamConfig(MockStream(parent_slices, all_parent_data, "first_stream"), "id", "first_stream_id")], [ {"parent_slice": "first", "first_stream_id": 0}, {"parent_slice": "first", "first_stream_id": 1}, @@ -84,10 +80,11 @@ def read_records( ( "test_multiple_parent_streams", [ - MockStream(parent_slices, data_first_parent_slice + data_second_parent_slice, "first_stream"), - MockStream(second_parent_stream_slice, more_records, "second_stream"), + ParentStreamConfig( + MockStream(parent_slices, data_first_parent_slice + data_second_parent_slice, "first_stream"), "id", "first_stream_id" + ), + ParentStreamConfig(MockStream(second_parent_stream_slice, more_records, "second_stream"), "id", "second_stream_id"), ], - slice_definition, [ {"parent_slice": "first", "first_stream_id": 0}, {"parent_slice": "first", "first_stream_id": 1}, @@ -99,8 +96,132 @@ def read_records( ), ], ) -def test_substream_slicer(test_name, parent_streams, slice_definition, expected_slices): - state = DictState() - slicer = SubstreamSlicer(parent_streams, state, slice_definition) +def test_substream_slicer(test_name, parent_stream_configs, expected_slices): + if expected_slices is None: + try: + SubstreamSlicer(parent_stream_configs) + assert False + except ValueError: + return + slicer = SubstreamSlicer(parent_stream_configs) slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)] assert slices == expected_slices + + +@pytest.mark.parametrize( + "test_name, stream_slice, expected_state", + [ + ("test_update_cursor_no_state_no_record", {}, {}), + ("test_update_cursor_with_state_single_parent", {"first_stream_id": "1234"}, {"first_stream_id": "1234"}), + ("test_update_cursor_with_unknown_state_field", {"unknown_stream_id": "1234"}, {}), + ( + "test_update_cursor_with_state_from_both_parents", + {"first_stream_id": "1234", "second_stream_id": "4567"}, + {"first_stream_id": "1234", "second_stream_id": "4567"}, + ), + ], +) +def test_update_cursor(test_name, stream_slice, expected_state): + parent_stream_name_to_config = [ + ParentStreamConfig( + MockStream(parent_slices, data_first_parent_slice + data_second_parent_slice, "first_stream"), "id", "first_stream_id" + ), + ParentStreamConfig(MockStream(second_parent_stream_slice, more_records, "second_stream"), "id", "second_stream_id"), + ] + + slicer = SubstreamSlicer(parent_stream_name_to_config) + slicer.update_cursor(stream_slice, None) + updated_state = slicer.get_stream_state() + assert expected_state == updated_state + + +@pytest.mark.parametrize( + "test_name, parent_stream_request_options, expected_req_params, expected_headers, expected_body_json, expected_body_data", + [ + ( + "test_request_option_in_request_param", + [ + RequestOption(RequestOptionType.request_parameter, "first_stream"), + RequestOption(RequestOptionType.request_parameter, "second_stream"), + ], + {"first_stream_id": "1234", "second_stream_id": "4567"}, + {}, + {}, + {}, + ), + ( + "test_request_option_in_header", + [ + RequestOption(RequestOptionType.header, "first_stream"), + RequestOption(RequestOptionType.header, "second_stream"), + ], + {}, + {"first_stream_id": "1234", "second_stream_id": "4567"}, + {}, + {}, + ), + ( + "test_request_option_in_param_and_header", + [ + RequestOption(RequestOptionType.request_parameter, "first_stream"), + RequestOption(RequestOptionType.header, "second_stream"), + ], + {"first_stream_id": "1234"}, + {"second_stream_id": "4567"}, + {}, + {}, + ), + ( + "test_request_option_in_body_json", + [ + RequestOption(RequestOptionType.body_json, "first_stream"), + RequestOption(RequestOptionType.body_json, "second_stream"), + ], + {}, + {}, + {"first_stream_id": "1234", "second_stream_id": "4567"}, + {}, + ), + ( + "test_request_option_in_body_data", + [ + RequestOption(RequestOptionType.body_data, "first_stream"), + RequestOption(RequestOptionType.body_data, "second_stream"), + ], + {}, + {}, + {}, + {"first_stream_id": "1234", "second_stream_id": "4567"}, + ), + ], +) +def test_request_option( + test_name, + parent_stream_request_options, + expected_req_params, + expected_headers, + expected_body_json, + expected_body_data, +): + slicer = SubstreamSlicer( + [ + ParentStreamConfig( + MockStream(parent_slices, data_first_parent_slice + data_second_parent_slice, "first_stream"), + "id", + "first_stream_id", + parent_stream_request_options[0], + ), + ParentStreamConfig( + MockStream(second_parent_stream_slice, more_records, "second_stream"), + "id", + "second_stream_id", + parent_stream_request_options[1], + ), + ], + ) + slicer.update_cursor({"first_stream_id": "1234", "second_stream_id": "4567"}, None) + + assert expected_req_params == slicer.request_params() + assert expected_headers == slicer.request_headers() + assert expected_body_json == slicer.request_body_json() + assert expected_body_data == slicer.request_body_data() diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index 26152d3de5c62c..a16eeab45d5eee 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -26,6 +26,7 @@ from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer +from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition @@ -85,8 +86,7 @@ def test_list_based_stream_slicer_with_values_refd(): stream_slicer: class_name: airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer.ListStreamSlicer slice_values: "*ref(repositories)" - slice_definition: - repository: "{{ slice_value }}" + cursor_field: repository """ config = parser.parse(content) stream_slicer = factory.create_component(config["stream_slicer"], input_config)() @@ -96,14 +96,101 @@ def test_list_based_stream_slicer_with_values_refd(): def test_list_based_stream_slicer_with_values_defined_in_config(): content = """ stream_slicer: - class_name: airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer.ListStreamSlicer + type: ListStreamSlicer slice_values: "{{config['repos']}}" - slice_definition: - repository: "{{ slice_value }}" + cursor_field: repository + request_option: + inject_into: header + field_name: repository """ config = parser.parse(content) stream_slicer = factory.create_component(config["stream_slicer"], input_config)() assert ["airbyte", "airbyte-cloud"] == stream_slicer._slice_values + assert stream_slicer._request_option._option_type == RequestOptionType.header + assert stream_slicer._request_option._field_name == "repository" + + +def test_create_substream_slicer(): + content = """ + schema_loader: + file_path: "./source_sendgrid/schemas/{{name}}.yaml" + retriever: + requester: + path: "/v3" + record_selector: + extractor: + transform: "_" + stream_A: + type: DeclarativeStream + options: + name: "A" + primary_key: "id" + retriever: "*ref(retriever)" + url_base: "https://airbyte.io" + schema_loader: "*ref(schema_loader)" + stream_B: + type: DeclarativeStream + options: + name: "B" + primary_key: "id" + retriever: "*ref(retriever)" + url_base: "https://airbyte.io" + schema_loader: "*ref(schema_loader)" + stream_slicer: + type: SubstreamSlicer + parent_streams_configs: + - stream: "*ref(stream_A)" + parent_key: id + stream_slice_field: repository_id + request_option: + inject_into: request_parameter + field_name: repository_id + - stream: "*ref(stream_B)" + parent_key: someid + stream_slice_field: word_id + """ + config = parser.parse(content) + stream_slicer = factory.create_component(config["stream_slicer"], input_config)() + parent_stream_configs = stream_slicer._parent_stream_configs + assert len(parent_stream_configs) == 2 + assert isinstance(parent_stream_configs[0].stream, DeclarativeStream) + assert isinstance(parent_stream_configs[1].stream, DeclarativeStream) + assert stream_slicer._parent_stream_configs[0].parent_key == "id" + assert stream_slicer._parent_stream_configs[0].stream_slice_field == "repository_id" + assert stream_slicer._parent_stream_configs[0].request_option.inject_into == RequestOptionType.request_parameter + assert stream_slicer._parent_stream_configs[0].request_option._field_name == "repository_id" + + assert stream_slicer._parent_stream_configs[1].parent_key == "someid" + assert stream_slicer._parent_stream_configs[1].stream_slice_field == "word_id" + assert stream_slicer._parent_stream_configs[1].request_option is None + + +def test_create_cartesian_stream_slicer(): + content = """ + stream_slicer_A: + type: ListStreamSlicer + slice_values: "{{config['repos']}}" + cursor_field: repository + stream_slicer_B: + type: ListStreamSlicer + slice_values: + - hello + - world + cursor_field: words + stream_slicer: + type: CartesianProductStreamSlicer + stream_slicers: + - "*ref(stream_slicer_A)" + - "*ref(stream_slicer_B)" + """ + config = parser.parse(content) + stream_slicer = factory.create_component(config["stream_slicer"], input_config)() + underlying_slicers = stream_slicer._stream_slicers + assert len(underlying_slicers) == 2 + assert isinstance(underlying_slicers[0], ListStreamSlicer) + assert isinstance(underlying_slicers[1], ListStreamSlicer) + assert ["airbyte", "airbyte-cloud"] == underlying_slicers[0]._slice_values + assert ["hello", "world"] == underlying_slicers[1]._slice_values def test_datetime_stream_slicer(): @@ -111,15 +198,18 @@ def test_datetime_stream_slicer(): stream_slicer: type: DatetimeStreamSlicer options: - datetime_format: "%Y-%m-%d" + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" start_datetime: type: MinMaxDatetime datetime: "{{ config['start_time'] }}" min_datetime: "{{ config['start_time'] + day_delta(2) }}" end_datetime: "{{ config['end_time'] }}" step: "10d" - cursor_value: "created" + cursor_field: "created" lookback_window: "5d" + start_time_option: + inject_into: request_parameter + field_name: created[gte] """ config = parser.parse(content) @@ -128,14 +218,16 @@ def test_datetime_stream_slicer(): assert stream_slicer._timezone == datetime.timezone.utc assert type(stream_slicer._start_datetime) == MinMaxDatetime assert type(stream_slicer._end_datetime) == MinMaxDatetime - assert stream_slicer._start_datetime._datetime_format == "%Y-%m-%d" + assert stream_slicer._start_datetime._datetime_format == "%Y-%m-%dT%H:%M:%S.%f%z" assert stream_slicer._start_datetime._timezone == datetime.timezone.utc assert stream_slicer._start_datetime._datetime_interpolator._string == "{{ config['start_time'] }}" assert stream_slicer._start_datetime._min_datetime_interpolator._string == "{{ config['start_time'] + day_delta(2) }}" assert stream_slicer._end_datetime._datetime_interpolator._string == "{{ config['end_time'] }}" assert stream_slicer._step == datetime.timedelta(days=10) - assert stream_slicer._cursor_value._string == "created" + assert stream_slicer._cursor_field._string == "created" assert stream_slicer._lookback_window._string == "5d" + assert stream_slicer._start_time_option.inject_into == RequestOptionType.request_parameter + assert stream_slicer._start_time_option._field_name == "created[gte]" def test_full_config(): @@ -181,8 +273,6 @@ def test_full_config(): retriever: class_name: "airbyte_cdk.sources.declarative.retrievers.simple_retriever.SimpleRetriever" name: "{{ options['name'] }}" - state: - class_name: airbyte_cdk.sources.declarative.states.dict_state.DictState stream_slicer: class_name: airbyte_cdk.sources.declarative.stream_slicers.single_slice.SingleSlice paginator: