Skip to content

Commit

Permalink
Merge branch 'alex/selectNoRecords' into alex/configbasedsendgrid
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda committed Aug 11, 2022
2 parents e10d6b9 + 3dadc32 commit ac92374
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Union

import dpath.util
import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config, Record
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class DpathExtractor(RecordExtractor, JsonSchemaMixin):
"""
Record extractor that searches a decoded response over a path defined as an array of fields.
Examples of instantiating this transform:
```
extractor:
type: DpathExtractor
transform:
- "root"
- "data"
```
```
extractor:
type: DpathExtractor
transform:
- "root"
- "{{ options['field'] }}"
```
Attributes:
transform (Union[InterpolatedString, str]): Pointer to the field that should be extracted
config (Config): The user-provided configuration as specified by the source's spec
decoder (Decoder): The decoder responsible to transfom the response in a Mapping
"""

field_pointer: List[Union[InterpolatedString, str]]
config: Config
options: InitVar[Mapping[str, Any]]
decoder: Decoder = JsonDecoder(options={})

def __post_init__(self, options: Mapping[str, Any]):
for pointer_index in range(len(self.field_pointer)):
if isinstance(self.field_pointer[pointer_index], str):
self.field_pointer[pointer_index] = InterpolatedString.create(self.field_pointer[pointer_index], options=options)

def extract_records(self, response: requests.Response) -> List[Record]:
response_body = self.decoder.decode(response)
return dpath.util.get(response_body, [pointer.eval(self.config) for pointer in self.field_pointer], default=[])
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config, Record
from dataclasses_jsonschema import JsonSchemaMixin
from jello import lib as jello_lib


@dataclass
class JelloExtractor(JsonSchemaMixin):
class JelloExtractor(RecordExtractor, JsonSchemaMixin):
"""
Record extractor that evaluates a Jello query to extract records from a decoded response.
Expand All @@ -40,4 +41,7 @@ def __post_init__(self, options: Mapping[str, Any]):
def extract_records(self, response: requests.Response) -> List[Record]:
response_body = self.decoder.decode(response)
script = self.transform.eval(self.config)
return jello_lib.pyquery(response_body, script)
try:
return jello_lib.pyquery(response_body, script)
except KeyError:
return []
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List

import requests
from airbyte_cdk.sources.declarative.types import Record


@dataclass
class RecordExtractor(ABC):
"""
Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering
records based on a heuristic.
"""

@abstractmethod
def extract_records(
self,
response: requests.Response,
) -> List[Record]:
"""
Selects records from the response
:param response: The response to extract the records from
:return: List of Records extracted from the response
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import requests
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
Expand All @@ -20,11 +20,11 @@ class RecordSelector(HttpSelector, JsonSchemaMixin):
records based on a heuristic.
Attributes:
extractor (JelloExtractor): The record extractor responsible for extracting records from a response
extractor (RecordExtractor): The record extractor responsible for extracting records from a response
record_filter (RecordFilter): The record filter responsible for filtering extracted records
"""

extractor: JelloExtractor
extractor: RecordExtractor
options: InitVar[Mapping[str, Any]]
record_filter: RecordFilter = None

Expand All @@ -39,6 +39,8 @@ def select_records(
next_page_token: Optional[Mapping[str, Any]] = None,
) -> List[Record]:
all_records = self.extractor.extract_records(response)
if not all_records:
return []
# Some APIs don't wrap single records in a list
if not isinstance(all_records, list):
all_records = [all_records]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from airbyte_cdk.sources.declarative.auth.token import ApiKeyAuthenticator, BasicHttpAuthenticator, BearerAuthenticator
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
Expand Down Expand Up @@ -46,6 +47,7 @@
"DatetimeStreamSlicer": DatetimeStreamSlicer,
"DeclarativeStream": DeclarativeStream,
"DefaultErrorHandler": DefaultErrorHandler,
"DpathExtractor": DpathExtractor,
"ExponentialBackoffStrategy": ExponentialBackoffStrategy,
"HttpRequester": HttpRequester,
"InterpolatedBoolean": InterpolatedBoolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.jello import JelloExtractor
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
Expand Down Expand Up @@ -50,11 +52,12 @@
InterpolatedString: InterpolatedString,
MinMaxDatetime: MinMaxDatetime,
Paginator: NoPagination,
ParentStreamConfig: ParentStreamConfig,
RecordExtractor: JelloExtractor,
RequestOption: RequestOption,
RequestOptionsProvider: InterpolatedRequestOptionsProvider,
Requester: HttpRequester,
Retriever: SimpleRetriever,
ParentStreamConfig: ParentStreamConfig,
SchemaLoader: JsonSchema,
Stream: DeclarativeStream,
StreamSlicer: SingleSlice,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import json

import pytest
import requests
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor

config = {"field": "record_array"}
options = {"options_field": "record_array"}

decoder = JsonDecoder(options={})


@pytest.mark.parametrize(
"test_name, field_pointer, body, expected_records",
[
("test_extract_from_array", ["data"], {"data": [{"id": 1}, {"id": 2}]}, [{"id": 1}, {"id": 2}]),
("test_nested_field", ["data", "records"], {"data": {"records": [{"id": 1}, {"id": 2}]}}, [{"id": 1}, {"id": 2}]),
("test_field_in_config", ["{{ config['field'] }}"], {"record_array": [{"id": 1}, {"id": 2}]}, [{"id": 1}, {"id": 2}]),
("test_field_in_options", ["{{ options['options_field'] }}"], {"record_array": [{"id": 1}, {"id": 2}]}, [{"id": 1}, {"id": 2}]),
("test_field_does_not_exist", ["record"], {"id": 1}, []),
],
)
def test_dpath_extractor(test_name, field_pointer, body, expected_records):
extractor = DpathExtractor(field_pointer=field_pointer, config=config, decoder=decoder, options=options)

response = create_response(body)
actual_records = extractor.extract_records(response)

assert actual_records == expected_records


def create_response(body):
response = requests.Response()
response._content = json.dumps(body).encode("utf-8")
return response
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
("test_field_in_config", "_.{{ config['field'] }}", {"record_array": [{"id": 1}, {"id": 2}]}, [{"id": 1}, {"id": 2}]),
("test_field_in_options", "_.{{ options['options_field'] }}", {"record_array": [{"id": 1}, {"id": 2}]}, [{"id": 1}, {"id": 2}]),
("test_default", "_{{kwargs['field']}}", [{"id": 1}, {"id": 2}], [{"id": 1}, {"id": 2}]),
("test_field_does_not_exist", "_.record", {"id": 1}, []),
(
"test_remove_fields_from_records",
"[{k:v for k,v in d.items() if k != 'value_to_remove'} for d in _.data]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@
{"data": {"id": 1, "created_at": "06-06-21"}},
[{"id": 1, "created_at": "06-06-21"}],
),
(
"test_no_record",
"_.data",
None,
{"data": []},
[],
),
(
"test_no_record_from_root",
"_",
None,
[],
[],
),
],
)
def test_record_filter(test_name, transform_template, filter_template, body, expected_records):
Expand Down

0 comments on commit ac92374

Please sign in to comment.