Skip to content

Commit

Permalink
Low code connectors: core structure (#12850)
Browse files Browse the repository at this point in the history
* checkout from alex/cac

* doc

* doc

* remove broken test

* rename

* rename file

* delete unused file

* rename

* abstract property

* isort

* update state

* Update comment

* remove incremental mixin

* delete comment

* update comments

* update comments

* remove no_state

* rename package

* pass parameters through kwargs

* update interface to pass source in interface

* update interface to pass source in interface

* rename to stream_slicer

* Low code connectors: string interpolation with jinja (#12852)

* checkout from alex/cac

* Add missing tests

* Add missing files

* missing file

* rename

* jinja dependency

* Add comment

* comment

* comment

* Revert "delete unused file"

This reverts commit 758e939.

* delete unused field

* delete unused field

* rename

* pass kwargs directly

* isort

* Revert "isort"

This reverts commit 4a79223.

* format

* decoder

* better error handling

* remove nostate

* isort

* delete dead code

* Update mapping type to [str, Any]

* add comment

* Add comment

* pass parameters through kwargs

* move test to right module

* Add missing test

* Use authbase instead of deprecated class

* leverage generator

* rename to declarative

* rename the classes too
  • Loading branch information
girarda committed May 26, 2022
1 parent ddf529c commit 150ab59
Show file tree
Hide file tree
Showing 54 changed files with 1,452 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging
from abc import ABC, abstractmethod
from typing import Any, Mapping, Tuple

from airbyte_cdk.sources.source import Source


class ConnectionChecker(ABC):
"""
Abstract base class for checking a connection
"""

@abstractmethod
def check_connection(self, source: Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
"""
:param source: source
:param logger: source logger
:param config: The user-provided configuration as specified by the source's spec.
This usually contains information required to check connection e.g. tokens, secrets and keys etc.
:return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
and we can connect to the underlying data source using the provided configuration.
Otherwise, the input config cannot be used to connect to the underlying data source,
and the "error" object should describe what went wrong.
The error object will be cast to string to display the problem to the user.
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from abc import abstractmethod
from typing import Tuple

from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker


class ConfigurableSource(AbstractSource):
"""
Base class for declarative Source. Concrete sources need to define the connection_checker to use
"""

@property
@abstractmethod
def connection_checker(self) -> ConnectionChecker:
pass

def check_connection(self, logger, config) -> Tuple[bool, any]:
return self.connection_checker.check_connection(self, logger, config)
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.streams.core import Stream


class DeclarativeStream(Stream):
"""
DeclarativeStream is a Stream that delegates most of its logic to its schema_load and retriever
"""

def __init__(self, name, primary_key, cursor_field, schema_loader: SchemaLoader, retriever):
self._name = name
self._primary_key = primary_key
self._cursor_field = cursor_field
self._schema_loader = schema_loader
self._retriever: Retriever = retriever

@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
return self._primary_key

@property
def name(self) -> str:
"""
:return: Stream name. By default this is the implementing class name, but it can be overridden as needed.
"""
return self._name

@property
def state(self) -> MutableMapping[str, Any]:
return self._retriever.get_state()

@property
def cursor_field(self) -> Union[str, List[str]]:
"""
Override to return the default cursor field used by this stream e.g: an API entity might always use created_at as the cursor field.
:return: The name of the field used as a cursor. If the cursor is nested, return an array consisting of the path to the cursor.
"""
return self._cursor_field

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
return self._retriever.read_records(sync_mode, cursor_field, stream_slice, stream_state)

def get_json_schema(self) -> Mapping[str, Any]:
"""
:return: A dict of the JSON schema representing this stream.
The default implementation of this method looks for a JSONSchema file with the same name as this stream's "name" property.
Override as needed.
"""
# TODO show an example of using pydantic to define the JSON schema, or reading an OpenAPI spec
return self._schema_loader.get_json_schema()

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
"""
Override to define the slices for this stream. See the stream slicing section of the docs for more information.
:param sync_mode:
:param cursor_field:
:param stream_state:
:return:
"""
# this is not passing the cursor field because it is known at init time
return self._retriever.stream_slices(sync_mode=sync_mode, stream_state=stream_state)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Any, Mapping

import requests


class Decoder(ABC):
@abstractmethod
def decode(self, response: requests.Response) -> Mapping[str, Any]:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder


class JsonDecoder(Decoder):
def decode(self, response: requests.Response) -> Mapping[str, Any]:
return response.json()
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import List

import requests
from airbyte_cdk.sources.declarative.types import Record


class HttpExtractor(ABC):
@abstractmethod
def extract_records(self, response: requests.Response) -> List[Record]:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping

from airbyte_cdk.sources.declarative.interpolation.interpolation import Interpolation
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation


class InterpolatedMapping:
def __init__(self, mapping: Mapping[str, Any], interpolation: Interpolation = JinjaInterpolation()):
self._mapping = mapping
self._interpolation = interpolation

def eval(self, config, **kwargs):
interpolated_values = {
self._interpolation.eval(name, config, **kwargs): self._eval(value, config, **kwargs) for name, value in self._mapping.items()
}
return interpolated_values

def _eval(self, value, config, **kwargs):
# The values in self._mapping can be of Any type
# We only want to interpolate them if they are strings
if type(value) == str:
return self._interpolation.eval(value, config, **kwargs)
else:
return value
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from typing import Optional

from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation


class InterpolatedString:
def __init__(self, string: str, default: Optional[str] = None):
self._string = string
self._default = default or string
self._interpolation = JinjaInterpolation()

def eval(self, config, **kwargs):
return self._interpolation.eval(self._string, config, self._default, **kwargs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod

from airbyte_cdk.sources.declarative.types import Config


class Interpolation(ABC):
@abstractmethod
def eval(self, input_str: str, config: Config, **kwargs) -> str:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import datetime

from airbyte_cdk.sources.declarative.interpolation.interpolation import Interpolation
from jinja2 import Environment
from jinja2.exceptions import UndefinedError


class JinjaInterpolation(Interpolation):
def __init__(self):
self._environment = Environment()
# Defines some utility methods that can be called from template strings
# eg "{{ today_utc() }}
self._environment.globals["now_local"] = datetime.datetime.now
self._environment.globals["now_utc"] = lambda: datetime.datetime.now(datetime.timezone.utc)
self._environment.globals["today_utc"] = lambda: datetime.datetime.now(datetime.timezone.utc).date()

def eval(self, input_str: str, config, default=None, **kwargs):
context = {"config": config, **kwargs}
try:
if isinstance(input_str, str):
result = self._eval(input_str, context)
if result:
return result
else:
# If input is not a string, return it as is
raise Exception(f"Expected a string. got {input_str}")
except UndefinedError:
pass
# If result is empty or resulted in an undefined error, evaluate and return the default string
return self._eval(default, context)

def _eval(self, s: str, context):
try:
return self._environment.from_string(s).render(context)
except TypeError:
# The string is a static value, not a jinja template
# It can be returned as is
return s
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from typing import Any, List, Mapping, Optional

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator


class InterpolatedPaginator(Paginator):
def __init__(self, next_page_token_template: Mapping[str, str], decoder: Decoder, config):
self._next_page_token_template = InterpolatedMapping(next_page_token_template, JinjaInterpolation())
self._decoder = decoder
self._config = config

def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
decoded_response = self._decoder.decode(response)
headers = response.headers
interpolated_values = self._next_page_token_template.eval(
self._config, decoded_response=decoded_response, headers=headers, last_records=last_records
)

non_null_tokens = {k: v for k, v in interpolated_values.items() if v}

return non_null_tokens if non_null_tokens else None
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from typing import Any, List, Mapping, Optional

import requests
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator


class NoPagination(Paginator):
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
return None
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from typing import Any, List, Mapping, Optional

import requests


class Paginator(ABC):
@abstractmethod
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from typing import Any, Mapping, MutableMapping

from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.requesters.request_params.request_parameters_provider import RequestParameterProvider


class InterpolatedRequestParameterProvider(RequestParameterProvider):
def __init__(self, *, config, request_parameters=None):
if request_parameters is None:
request_parameters = dict()
self._interpolation = InterpolatedMapping(request_parameters, JinjaInterpolation())
self._config = config

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
interpolated_values = self._interpolation.eval(self._config, **kwargs)
non_null_tokens = {k: v for k, v in interpolated_values.items() if v}
return non_null_tokens

0 comments on commit 150ab59

Please sign in to comment.