Skip to content

Commit

Permalink
[low-code cdk] break resolving reference preprocessing into its own c…
Browse files Browse the repository at this point in the history
…lass so it can be reused (#19517)

* break resolving reference preprocessing into its own class so it can be reused

* move reference resolution into the ManifestDeclarativeSource and deprecate the parser

* formatting

* last formatting i promise

* rename

* bump version

Co-authored-by: Alexandre Girard <alexandre@airbyte.io>
  • Loading branch information
brianjlai and girarda committed Nov 17, 2022
1 parent 1dbad96 commit e97ece5
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 218 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.9.4
Low-code: Fix reference resolution for connector builder

## 0.9.3
Low-code: Avoid duplicate HTTP query in `simple_retriever`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver
from airbyte_cdk.sources.declarative.types import ConnectionDefinition
from airbyte_cdk.sources.streams.core import Stream
from dataclasses_jsonschema import JsonSchemaMixin
Expand All @@ -47,7 +48,10 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False):
:param debug(bool): True if debug mode is enabled
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")
self._source_config = source_config

evaluated_manifest = {}
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(source_config, evaluated_manifest, "")
self._source_config = resolved_source_config
self._debug = debug
self._factory = DeclarativeComponentFactory()

Expand Down Expand Up @@ -135,8 +139,8 @@ def _stream_configs(self):

@staticmethod
def generate_schema() -> str:
expanded_source_definition = ManifestDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
expanded_schema = expanded_source_definition.json_schema()
expanded_source_manifest = ManifestDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
expanded_schema = expanded_source_manifest.json_schema()
return json.dumps(expanded_schema, cls=SchemaEncoder)

@staticmethod
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,12 @@
from copy import deepcopy
from typing import Any, Mapping, Tuple, Union

import yaml
from airbyte_cdk.sources.declarative.parsers.config_parser import ConnectionDefinitionParser
from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException
from airbyte_cdk.sources.declarative.types import ConnectionDefinition


class YamlParser(ConnectionDefinitionParser):
class ManifestReferenceResolver:
"""
Parses a Yaml string to a ConnectionDefinition
In addition to standard Yaml parsing, the input_string can contain references to values previously defined.
An incoming manifest can contain references to values previously defined.
This parser will dereference these values to produce a complete ConnectionDefinition.
References can be defined using a *ref(<arg>) string.
Expand Down Expand Up @@ -101,31 +96,20 @@ class YamlParser(ConnectionDefinitionParser):

ref_tag = "$ref"

def parse(self, connection_definition_str: str) -> ConnectionDefinition:
"""
Parses a yaml file and dereferences string in the form "*ref({reference)"
to {reference}
:param connection_definition_str: yaml string to parse
:return: The ConnectionDefinition parsed from connection_definition_str
"""
input_mapping = yaml.safe_load(connection_definition_str)
evaluated_definition = {}
return self._preprocess_dict(input_mapping, evaluated_definition, "")

def _preprocess_dict(self, input_mapping: Mapping[str, Any], evaluated_mapping: Mapping[str, Any], path: Union[str, Tuple[str]]):
def preprocess_manifest(self, manifest: Mapping[str, Any], evaluated_mapping: Mapping[str, Any], path: Union[str, Tuple[str]]):

"""
:param input_mapping: mapping produced by parsing yaml
:param manifest: incoming manifest that could have references to previously defined components
:param evaluated_mapping: mapping produced by dereferencing the content of input_mapping
:param path: curent path in configuration traversal
:return:
"""
d = {}
if self.ref_tag in input_mapping:
partial_ref_string = input_mapping[self.ref_tag]
if self.ref_tag in manifest:
partial_ref_string = manifest[self.ref_tag]
d = deepcopy(self._preprocess(partial_ref_string, evaluated_mapping, path))

for key, value in input_mapping.items():
for key, value in manifest.items():
if key == self.ref_tag:
continue
full_path = self._resolve_value(key, path)
Expand Down Expand Up @@ -180,7 +164,7 @@ def _preprocess(self, value, evaluated_config: Mapping[str, Any], path):
key = *key[:-1], split[0], ".".join(split[1:])
raise UndefinedReferenceException(path, ref_key)
elif isinstance(value, dict):
return self._preprocess_dict(value, evaluated_config, path)
return self.preprocess_manifest(value, evaluated_config, path)
elif type(value) == list:
evaluated_list = [
# pass in elem's path instead of the list's path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

import pkgutil

import yaml
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
from airbyte_cdk.sources.declarative.types import ConnectionDefinition


Expand All @@ -25,8 +25,18 @@ def _read_and_parse_yaml_file(self, path_to_yaml_file) -> ConnectionDefinition:

yaml_config = pkgutil.get_data(package, path_to_yaml_file)
decoded_yaml = yaml_config.decode()
return YamlParser().parse(decoded_yaml)
return self._parse(decoded_yaml)

def _emit_manifest_debug_message(self, extra_args: dict):
extra_args["path_to_yaml"] = self._path_to_yaml
self.logger.debug("declarative source created from parsed YAML manifest", extra=extra_args)

@staticmethod
def _parse(connection_definition_str: str) -> ConnectionDefinition:
"""
Parses a yaml file into a manifest. Component references still exist in the manifest which will be
resolved during the creating of the DeclarativeSource.
:param connection_definition_str: yaml string to parse
:return: The ConnectionDefinition parsed from connection_definition_str
"""
return yaml.safe_load(connection_definition_str)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.9.3",
version="0.9.4",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import pytest
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver
from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException

resolver = ManifestReferenceResolver()


def test_get_ref():
s = "*ref(limit)"
ref_key = resolver._get_ref_key(s)
assert ref_key == "limit"


def test_get_ref_no_ref():
s = "limit: 50"

ref_key = resolver._get_ref_key(s)
assert ref_key is None


def test_refer():
content = {
"limit": 50,
"limit_ref": "*ref(limit)"
}
config = resolver.preprocess_manifest(content, {}, "")
assert config["limit_ref"] == 50


def test_refer_to_inner():
content = {
"dict": {
"limit": 50
},
"limit_ref": "*ref(dict.limit)"
}
config = resolver.preprocess_manifest(content, {}, "")
assert config["limit_ref"] == 50


def test_refer_to_non_existant_struct():
content = {
"dict": {
"limit": 50
},
"limit_ref": "*ref(not_dict)"
}
with pytest.raises(UndefinedReferenceException):
resolver.preprocess_manifest(content, {}, "")


def test_refer_in_dict():
content = {
"limit": 50,
"offset_request_parameters": {
"offset": "{{ next_page_token['offset'] }}",
"limit": "*ref(limit)"
}
}
config = resolver.preprocess_manifest(content, {}, "")
assert config["offset_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}"
assert config["offset_request_parameters"]["limit"] == 50


def test_refer_to_dict():
content = {
"limit": 50,
"offset_request_parameters": {
"offset": "{{ next_page_token['offset'] }}",
"limit": "*ref(limit)"
},
"offset_pagination_request_parameters": {
"class": "InterpolatedRequestParameterProvider",
"request_parameters": "*ref(offset_request_parameters)"
}
}
config = resolver.preprocess_manifest(content, {}, "")
assert config["limit"] == 50
assert config["offset_request_parameters"]["limit"] == 50
assert len(config["offset_pagination_request_parameters"]) == 2
assert config["offset_pagination_request_parameters"]["request_parameters"]["limit"] == 50
assert config["offset_pagination_request_parameters"]["request_parameters"]["offset"] == "{{ next_page_token['offset'] }}"


def test_refer_and_overwrite():
content = {
"limit": 50,
"custom_limit": 25,
"offset_request_parameters": {
"offset": "{{ next_page_token['offset'] }}",
"limit": "*ref(limit)"
},
"custom_request_parameters": {
"$ref": "*ref(offset_request_parameters)",
"limit": "*ref(custom_limit)"
}
}
config = resolver.preprocess_manifest(content, {}, "")
assert config["offset_request_parameters"]["limit"] == 50
assert config["custom_request_parameters"]["limit"] == 25

assert config["offset_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}"
assert config["custom_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}"


def test_collision():
content = {
"example": {
"nested":{
"path": "first one",
"more_nested": {
"value": "found it!"
}
},
"nested.path": "uh oh",
},
"reference_to_nested_path": {
"$ref": "*ref(example.nested.path)"
},
"reference_to_nested_nested_value": {
"$ref": "*ref(example.nested.more_nested.value)"
}
}
config = resolver.preprocess_manifest(content, {}, "")
assert config["example"]["nested"]["path"] == "first one"
assert config["example"]["nested.path"] == "uh oh"
assert config["reference_to_nested_path"] == "uh oh"
assert config["example"]["nested"]["more_nested"]["value"] == "found it!"
assert config["reference_to_nested_nested_value"] == "found it!"


def test_list():
content = {
"list": ["A", "B"],
"elem_ref": "*ref(list[0])"
}
config = resolver.preprocess_manifest(content, {}, "")
elem_ref = config["elem_ref"]
assert elem_ref == "A"
Loading

0 comments on commit e97ece5

Please sign in to comment.