Skip to content

Commit

Permalink
SAT: cursor_paths should support custom nested and absolute paths (#4552
Browse files Browse the repository at this point in the history
)

* extend cursor_path logic to support custom nested and absolute paths

Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com>
Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
  • Loading branch information
3 people committed Jul 5, 2021
1 parent b4e55ac commit f4ceebe
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# Changelog

## 0.1.5
## 0.1.8
Fix cursor_path to support nested and absolute paths: https://github.com/airbytehq/airbyte/pull/4552

## 0.1.7
Add: `test_spec` additionally checks if Dockerfile has `ENV AIRBYTE_ENTRYPOINT` defined and equal to space_joined `ENTRYPOINT`

## 0.1.6
Add test whether PKs present and not None if `source_defined_primary_key` defined: https://github.com/airbytehq/airbyte/pull/4140

## 0.1.5
Expand Down
6 changes: 3 additions & 3 deletions airbyte-integrations/bases/source-acceptance-test/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
FROM python:3.7-slim

RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*
ENV CODE_PATH="source_acceptance_test"

WORKDIR /airbyte/source_acceptance_test
COPY $CODE_PATH ./$CODE_PATH
COPY source_acceptance_test ./source_acceptance_test
COPY setup.py ./
COPY pytest.ini ./
RUN pip install .

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"]
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,14 @@ def records_with_state(records, state, stream_mapping, state_cursor_paths) -> It
stream_name = record.record.stream
stream = stream_mapping[stream_name]
helper = JsonSchemaHelper(schema=stream.stream.json_schema)
record_value = helper.get_cursor_value(record=record.record.data, cursor_path=stream.cursor_field)
state_value = helper.get_state_value(state=state[stream_name], cursor_path=state_cursor_paths[stream_name])
cursor_field = helper.field(stream.cursor_field)
record_value = cursor_field.parse(record=record.record.data)
try:
# first attempt to parse the state value assuming the state object is namespaced on stream names
state_value = cursor_field.parse(record=state[stream_name], path=state_cursor_paths[stream_name])
except KeyError:
# try second time as an absolute path in state file (i.e. bookmarks -> stream_name -> column -> value)
state_value = cursor_field.parse(record=state, path=state_cursor_paths[stream_name])
yield record_value, state_value


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,54 +24,62 @@


from functools import reduce
from typing import Any, List, Set
from typing import Any, List, Mapping, Optional, Set

import pendulum


class CatalogField:
"""Field class to represent cursor/pk fields"""

def __init__(self, schema: Mapping[str, Any], path: List[str]):
self.schema = schema
self.path = path
self.formats = self._detect_formats()

def _detect_formats(self) -> Set[str]:
"""Extract set of formats/types for this field"""
format_ = []
try:
format_ = self.schema.get("format", self.schema["type"])
if not isinstance(format_, List):
format_ = [format_]
except KeyError:
pass
return set(format_)

def _parse_value(self, value: Any) -> Any:
"""Do actual parsing of the serialized value"""
if self.formats.intersection({"datetime", "date-time", "date"}):
if value is None and "null" not in self.formats:
raise ValueError(f"Invalid field format. Value: {value}. Format: {self.formats}")
return pendulum.parse(value)
return value

def parse(self, record: Mapping[str, Any], path: Optional[List[str]] = None) -> Any:
"""Extract field value from the record and cast it to native type"""
path = path or self.path
value = reduce(lambda data, key: data[key], path, record)
return self._parse_value(value)


class JsonSchemaHelper:
def __init__(self, schema):
self._schema = schema

def get_ref(self, path):
def get_ref(self, path: List[str]):
node = self._schema
for segment in path.split("/")[1:]:
node = node[segment]
return node

def get_property(self, path: List[str]):
def get_property(self, path: List[str]) -> Mapping[str, Any]:
node = self._schema
for segment in path:
if "$ref" in node:
node = self.get_ref(node["$ref"])
node = node["properties"][segment]
return node

def get_format_for_key_path(self, path: List[str]) -> Set[str]:
format_ = []
try:
field = self.get_property(path)
format_ = field.get("format", field["type"])
if not isinstance(format_, List):
format_ = [format_]
except KeyError:
pass
return set(format_)

def get_cursor_value(self, record, cursor_path):
type_ = self.get_format_for_key_path(path=cursor_path)
value = reduce(lambda data, key: data[key], cursor_path, record)
return self.parse_value(value, type_)

@staticmethod
def parse_value(value: Any, format_: Set[str]):
if format_.intersection({"datetime", "date-time", "date"}):
if value is None and "null" not in format_:
raise ValueError(f"Invalid field format. Value: {value}. Format: {format_}")
return pendulum.parse(value)
return value

def get_state_value(self, state, cursor_path):
format_ = self.get_format_for_key_path(path=cursor_path)
value = state[cursor_path[-1]]
return self.parse_value(value, format_)
def field(self, path: List[str]) -> CatalogField:
return CatalogField(schema=self.get_property(path), path=path)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

import pendulum
import pytest
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteStream,
DestinationSyncMode,
SyncMode,
Type,
)
from source_acceptance_test.tests.test_incremental import records_with_state


@pytest.fixture(name="simple_state")
def simple_state_fixture():
return {
"my_stream": {
"id": 11,
"ts_created": "2014-01-01T22:03:11",
"ts_updated": "2015-01-01T22:03:11",
}
}


@pytest.fixture(name="nested_state")
def nested_state_fixture(simple_state):
return {"my_stream": {"some_account_id": simple_state["my_stream"]}}


@pytest.fixture(name="singer_state")
def singer_state_fixture(simple_state):
return {"bookmarks": simple_state}


@pytest.fixture(name="stream_schema")
def stream_schema_fixture():
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {"type": "integer"},
"ts_created": {"type": "string", "format": "datetime"},
"nested": {"type": "object", "properties": {"ts_updated": {"type": "string", "format": "date"}}},
},
}


@pytest.fixture(name="stream_mapping")
def stream_mapping_fixture(stream_schema):
return {
"my_stream": ConfiguredAirbyteStream(
stream=AirbyteStream(name="my_stream", json_schema=stream_schema),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.append,
)
}


@pytest.fixture(name="records")
def records_fixture():
return [
AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
stream="my_stream",
data={"id": 1, "ts_created": "2015-11-01T22:03:11", "nested": {"ts_updated": "2015-05-01"}},
emitted_at=0,
),
)
]


def test_simple_path(records, stream_mapping, simple_state):
stream_mapping["my_stream"].cursor_field = ["id"]
paths = {"my_stream": ["id"]}

result = records_with_state(records=records, state=simple_state, stream_mapping=stream_mapping, state_cursor_paths=paths)
record_value, state_value = next(result)

assert record_value == 1, "record value must be correctly found"
assert state_value == 11, "state value must be correctly found"


def test_nested_path(records, stream_mapping, nested_state):
stream_mapping["my_stream"].cursor_field = ["nested", "ts_updated"]
paths = {"my_stream": ["some_account_id", "ts_updated"]}

result = records_with_state(records=records, state=nested_state, stream_mapping=stream_mapping, state_cursor_paths=paths)
record_value, state_value = next(result)

assert record_value == pendulum.datetime(2015, 5, 1), "record value must be correctly found"
assert state_value == pendulum.datetime(2015, 1, 1, 22, 3, 11), "state value must be correctly found"


def test_nested_path_unknown(records, stream_mapping, simple_state):
stream_mapping["my_stream"].cursor_field = ["ts_created"]
paths = {"my_stream": ["unknown", "ts_created"]}

result = records_with_state(records=records, state=simple_state, stream_mapping=stream_mapping, state_cursor_paths=paths)
with pytest.raises(KeyError):
next(result)


def test_absolute_path(records, stream_mapping, singer_state):
stream_mapping["my_stream"].cursor_field = ["ts_created"]
paths = {"my_stream": ["bookmarks", "my_stream", "ts_created"]}

result = records_with_state(records=records, state=singer_state, stream_mapping=stream_mapping, state_cursor_paths=paths)
record_value, state_value = next(result)

assert record_value == pendulum.datetime(2015, 11, 1, 22, 3, 11), "record value must be correctly found"
assert state_value == pendulum.datetime(2014, 1, 1, 22, 3, 11), "state value must be correctly found"

0 comments on commit f4ceebe

Please sign in to comment.