Skip to content

Commit

Permalink
[low-code connectors] Add request options and state to stream slicers (
Browse files Browse the repository at this point in the history
…#14552)

* comment

* comment

* comments

* fix

* test for instantiating chain retrier

* fix parsing

* cleanup

* fix

* reset

* never raise on http error

* remove print

* comment

* comment

* comment

* comment

* remove prints

* add declarative stream to registry

* start working on limit paginator

* support for offset pagination

* tests

* move limit value

* extract request option

* boilerplate

* page increment

* delete offset paginator

* update conditional paginator

* refactor and fix test

* fix test

* small fix

* Delete dead code

* Add docstrings

* quick fix

* exponential backoff

* fix test

* fix

* delete unused properties

* fix

* missing unit tests

* uppercase

* docstrings

* rename to success

* compare full request instead of just url

* renmae module

* rename test file

* rename interface

* rename default retrier

* rename to compositeerrorhandler

* fix missing renames

* move action to filter

* str -> minmaxdatetime

* small fixes

* plural

* add example

* handle header variations

* also fix wait time from

* allow using a regex to extract the value

* group()

* docstring

* add docs

* update comment

* docstrings

* fix tests

* rename param

* cleanup stop_condition

* cleanup

* Add examples

* interpolated pagination strategy

* dont need duplicate class

* docstrings

* more docstrings

* docstrings

* fix tests

* first pass at substream

* seems to work for a single stream

* can also be defined in requester with stream_state

* tmp update

* update comment

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* version: Update Parquet library to latest release (#14502)

The upstream Parquet library that is currently pinned for use in the S3 destination plugin is over a year old. The current version is generating invalid schemas for date-time with time-zone fields which appears to be addressed in the `1.12.3` release of the library in commit apache/parquet-java@c72862b

* merge

* 🎉 Source Github: improve schema for stream `pull_request_commits` added "null" (#14613)

Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>

* Docs: Fixed broken links (#14622)

* fixing broken links

* more broken links

* source-hubspot: change mentioning of Mailchimp into HubSpot  doc (#14620)

* Helm Chart: Add external temporal option (#14597)

* conflict env configmap and chart lock

* reverting lock

* add eof lines and documentation on values yaml

* conflict json file

* rollback json

* solve conflict

* correct minio with new version

Co-authored-by: Guy Feldman <gfeldman@86labs.com>

* 🎉 Add YAML format to source-file reader (#14588)

* Add yaml reader

* Update docs

* Bumpversion of connector

* bump docs

* Update pyarrow dependency

* Upgrade pandas dependency

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>

* 🎉 Source Okta: add GroupMembers stream (#14380)

* add Group_Members stream to okta source

- Group_Members return a list of users, the same schema of Users stream.
- Create a shared schema users, and both group_members and users sechema use it as a reference.
- Add Group_Members stream to source connector

* add tests and fix logs schema

- fix the test error: None is not one of enums though the enum type includes both string and null, it comes from json schema validator
https://github.com/python-jsonschema/jsonschema/blob/ddb87afad8f5d5c40600b5ede0ab96e4d4bdf7d3/jsonschema/_validators.py#L279-L285
- change grouop_members to use id as the cursor field since `filter` is not supported in the query string
- fix the abnormal state test on logs stream, when since is abnormally large, until has to defined, an equal or a larger value
- remove logs stream from full sync test, because 2 full sync always has a gap -- at least a new log about users or groups api.

* last polish before submit the PR

- bump docker version
- update changelog
- add the right abnormal value for logs stream
- correct the sample catalog

* address comments::

- improve comments for until parameter under the logs stream
- add use_cache on groupMembers

* add use_cache to Group_Members

* change configured_catalog to test

* auto-bump connector version

Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>

* split test files

* renames

* missing unit test

* add missing unit tests

* rename

* assert isinstance

* start extracting to their own files

* use final instead of classmethod

* assert we retry 429 errors

* Add log

* replace asserts with valueexceptions

* delete superfluous print statement

* only accept minmaxdatetime

* fix factory so we don't need to union everything with strings

* get class_name from type

* remove from class types registry

* process error handlers one at a time

* sort

* delete print statement

* comment

* comment

* format

* delete unused file

* comment

* interpolatedboolean

* comment

* not optional

* not optional

* unit tests

* fix request body data

* add test

* move file to right module

* update

* reset to master

* format

* rename to pass_by

* rename to page size

* fix

* fix some tests

* reset

* fix

* fix some of the tests

* fix test

* fix more tests

* all tests pass

* path is not optional

* reset

* reset

* reset

* delete print

* remove prints

* delete duplicate method

* add test

* fix body data

* delete extra newlines

* move to subpackage

* fix imports

* handle str body data

* simplify

* Update tests

* filter dates before stream state

* Revert "Update tests"

This reverts commit c0808c8.

* update

* fix test

* state management

* add test

* delete dead code

* update cursor

* update cursor cartesian

* delete unused state class

* fix

* missing test

* update cursor substreams

* missing test

* fix typing

* fix typing

* delete unused field

* delete unused method

* update datetime stream slice

* cleanup

* assert

* request options

* request option cartesian

* assert when passing by path

* request options for substreams

* always return a map

* pass stream_state

* refactor and almost done fixing tests

* fix tests

* rename to inject_into

* only accept enum

* delete conditional paginator

* only return body data

* missing test

* update docstrings

* update docstrings

* update comment

* rename

* tests

* class_name -> type

* improve interface

* fix some of the tests

* fix more of the tests

* fix tests

* reset

* reset

* Revert "reset"

This reverts commit eb9a918.

* remove extra argument

* docstring

* update

* delete unused file

* reset

* reset

* rename

* fix timewindow

* create InterpolatedString

* helper method

* assert on request option

* better asserts

* format

* docstrings

* docstrings

* remove optional from type hint

* Update airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/cartesian_product_stream_slicer.py

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* inherit from request options provider

* inherit from request options provider

* remove optional from type hint

* remove extra parameter

* none check

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
Co-authored-by: Tobias Macey <tmacey@boundlessnotions.com>
Co-authored-by: Serhii Chvaliuk <grubberr@gmail.com>
Co-authored-by: Amruta Ranade <11484018+Amruta-Ranade@users.noreply.github.com>
Co-authored-by: Bas Beelen <bjgbeelen@gmail.com>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: Guy Feldman <gfeldman@86labs.com>
Co-authored-by: Christophe Duong <christophe.duong@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Yiyang Li <yiyangli2010@gmail.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
  • Loading branch information
12 people committed Jul 27, 2022
1 parent fd09c32 commit 44ec661
Show file tree
Hide file tree
Showing 23 changed files with 1,110 additions and 404 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.substream_slicer import SubstreamSlicer
from airbyte_cdk.sources.declarative.transformations import RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddFields

Expand Down Expand Up @@ -59,4 +60,5 @@
"RecordSelector": RecordSelector,
"RemoveFields": RemoveFields,
"SimpleRetriever": SimpleRetriever,
"SubstreamSlicer": SubstreamSlicer,
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.states.dict_state import DictState
from airbyte_cdk.sources.declarative.states.state import State
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.substream_slicer import ParentStreamConfig
from airbyte_cdk.sources.streams.core import Stream

"""
Expand All @@ -55,8 +54,8 @@
RequestOptionsProvider: InterpolatedRequestOptionsProvider,
Requester: HttpRequester,
Retriever: SimpleRetriever,
ParentStreamConfig: ParentStreamConfig,
SchemaLoader: JsonSchema,
State: DictState,
Stream: DeclarativeStream,
StreamSlicer: SingleSlice,
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ def request_body_data(self) -> Mapping[str, Any]:
def request_body_json(self) -> Mapping[str, Any]:
return self._get_request_options(RequestOptionType.body_json)

def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
return {}

def _get_request_options(self, option_type: RequestOptionType) -> Mapping[str, Any]:
options = {}
if self._page_token_option.inject_into == option_type:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,9 @@ def request_body_data(self) -> Union[Mapping[str, Any], str]:
def request_body_json(self) -> Mapping[str, Any]:
return {}

def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
return {}

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Mapping[str, Any]:
return {}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from abc import abstractmethod
from typing import Any, List, Mapping, Optional

import requests
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider


class Paginator(ABC):
class Paginator(RequestOptionsProvider):
"""
Defines the token to use to fetch the next page of records from the API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ class RequestOptionsProvider(ABC):
"""

@abstractmethod
def request_params(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None
) -> MutableMapping[str, Any]:
def request_params(self, **kwargs) -> MutableMapping[str, Any]:
"""
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
"""
pass

@abstractmethod
def request_headers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.states.dict_state import DictState
from airbyte_cdk.sources.declarative.states.state import State
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
Expand Down Expand Up @@ -42,7 +40,6 @@ def __init__(
record_selector: HttpSelector,
paginator: Optional[Paginator] = None,
stream_slicer: Optional[StreamSlicer] = SingleSlice(),
state: Optional[State] = None,
):
"""
:param name: The stream's name
Expand All @@ -59,8 +56,7 @@ def __init__(
self._requester = requester
self._record_selector = record_selector
super().__init__(self._requester.get_authenticator())
self._iterator = stream_slicer
self._state: State = (state or DictState()).deep_copy()
self._stream_slicer = stream_slicer
self._last_response = None
self._last_records = None

Expand Down Expand Up @@ -300,12 +296,14 @@ def read_records(
stream_state: Optional[StreamState] = None,
) -> Iterable[Mapping[str, Any]]:
# Warning: use self.state instead of the stream_state passed as argument!
stream_slice = stream_slice or {} # None-check
records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state)
for r in records_generator:
self._state.update_state(stream_slice=stream_slice, stream_state=self.state, last_response=self._last_response, last_record=r)
self._stream_slicer.update_cursor(stream_slice, last_record=r)
yield r
else:
self._state.update_state(stream_slice=stream_slice, stream_state=self.state, last_reponse=self._last_response)
last_record = self._last_records[-1] if self._last_records else None
self._stream_slicer.update_cursor(stream_slice, last_record=last_record)
yield from []

def stream_slices(
Expand All @@ -320,13 +318,13 @@ def stream_slices(
:return:
"""
# Warning: use self.state instead of the stream_state passed as argument!
return self._iterator.stream_slices(sync_mode, self.state)
return self._stream_slicer.stream_slices(sync_mode, self.state)

@property
def state(self) -> StreamState:
return self._state.get_stream_state()
def state(self) -> MutableMapping[str, Any]:
return self._stream_slicer.get_stream_state()

@state.setter
def state(self, value: StreamState):
"""State setter, accept state serialized by state getter."""
self._state.set_state(value)
self._stream_slicer.update_cursor(value)

This file was deleted.

This file was deleted.

19 changes: 0 additions & 19 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/states/state.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import itertools
from collections import ChainMap
from typing import Any, Iterable, List, Mapping
from typing import Any, Iterable, List, Mapping, Optional

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
Expand All @@ -28,8 +28,34 @@ class CartesianProductStreamSlicer(StreamSlicer):
"""

def __init__(self, stream_slicers: List[StreamSlicer]):
"""
:param stream_slicers: Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer.
"""
self._stream_slicers = stream_slicers

def update_cursor(self, stream_slice: Mapping[str, Any], last_record: Optional[Mapping[str, Any]] = None):
for slicer in self._stream_slicers:
slicer.update_cursor(stream_slice, last_record)

def request_params(self) -> Mapping[str, Any]:
return dict(ChainMap(*[s.request_params() for s in self._stream_slicers]))

def request_headers(self) -> Mapping[str, Any]:
return dict(ChainMap(*[s.request_headers() for s in self._stream_slicers]))

def request_body_data(self) -> Mapping[str, Any]:
return dict(ChainMap(*[s.request_body_data() for s in self._stream_slicers]))

def request_body_json(self) -> Optional[Mapping]:
return dict(ChainMap(*[s.request_body_json() for s in self._stream_slicers]))

def request_kwargs(self) -> Mapping[str, Any]:
# Never update kwargs
return {}

def get_stream_state(self) -> Mapping[str, Any]:
return dict(ChainMap(*[slicer.get_stream_state() for slicer in self._stream_slicers]))

def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]:
sub_slices = (s.stream_slices(sync_mode, stream_state) for s in self._stream_slicers)
return (ChainMap(*a) for a in itertools.product(*sub_slices))
Loading

0 comments on commit 44ec661

Please sign in to comment.