From 560f4e87c9b8a5e3c072abf6b6d1b2113e786997 Mon Sep 17 00:00:00 2001 From: Brian Lai <51336873+brianjlai@users.noreply.github.com> Date: Tue, 8 Nov 2022 15:45:01 -0500 Subject: [PATCH] [low-code cdk] decouple parsing the yaml manifest from the declarative source implementation (#19095) * decouple parsing the yaml manifest from the declarative source implementation * bump version and changelog --- airbyte-cdk/python/CHANGELOG.md | 3 + .../manifest_declarative_source.py | 173 +++++ .../declarative/yaml_declarative_source.py | 171 +---- airbyte-cdk/python/setup.py | 2 +- .../test_manifest_declarative_source.py | 601 ++++++++++++++++++ .../test_yaml_declarative_source.py | 468 +------------- 6 files changed, 795 insertions(+), 623 deletions(-) create mode 100644 airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py create mode 100644 airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 4612a87fa4854..a9284fec5ade7 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.7.1 +Low-code: Decouple yaml manifest parsing from the declarative source implementation + ## 0.7.0 Low-code: Allow connector specifications to be defined in the manifest diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py new file mode 100644 index 0000000000000..a6a3db4a5f013 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -0,0 +1,173 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import inspect +import json +import logging +import typing +from dataclasses import dataclass, fields +from enum import Enum, EnumMeta +from typing import Any, List, Mapping, Union + +from airbyte_cdk.models import ConnectorSpecification +from airbyte_cdk.sources.declarative.checks import CheckStream +from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker +from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource +from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream +from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException +from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory +from airbyte_cdk.sources.declarative.types import ConnectionDefinition +from airbyte_cdk.sources.streams.core import Stream +from dataclasses_jsonschema import JsonSchemaMixin +from jsonschema.validators import validate + + +@dataclass +class ConcreteDeclarativeSource(JsonSchemaMixin): + version: str + checker: CheckStream + streams: List[DeclarativeStream] + + +class ManifestDeclarativeSource(DeclarativeSource): + """Declarative source defined by a manifest of low-code components that define source connector behavior""" + + VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"} + + def __init__(self, source_config: ConnectionDefinition): + """ + :param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector + """ + self.logger = logging.getLogger(f"airbyte.{self.name}") + self._source_config = source_config + self._factory = DeclarativeComponentFactory() + + self._validate_source() + + # Stopgap to protect the top-level namespace until it's validated through the schema + unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS] + if unknown_fields: + raise InvalidConnectorDefinitionException(f"Found unknown top-level fields: {unknown_fields}") + + @property + def connection_checker(self) -> ConnectionChecker: + check = self._source_config["check"] + if "class_name" not in check: + check["class_name"] = "airbyte_cdk.sources.declarative.checks.check_stream.CheckStream" + return self._factory.create_component(check, dict())(source=self) + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}) + + source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()] + for stream in source_streams: + # make sure the log level is always applied to the stream's logger + self._apply_log_level_to_stream_logger(self.logger, stream) + return source_streams + + def spec(self, logger: logging.Logger) -> ConnectorSpecification: + """ + Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible + configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this + will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" + in the project root. + """ + self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)}) + + spec = self._source_config.get("spec") + if spec: + if "class_name" not in spec: + spec["class_name"] = "airbyte_cdk.sources.declarative.spec.Spec" + spec_component = self._factory.create_component(spec, dict())() + return spec_component.generate_spec() + else: + return super().spec(logger) + + def _validate_source(self): + full_config = {} + if "version" in self._source_config: + full_config["version"] = self._source_config["version"] + if "check" in self._source_config: + full_config["checker"] = self._source_config["check"] + streams = [self._factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()] + if len(streams) > 0: + full_config["streams"] = streams + declarative_source_schema = ConcreteDeclarativeSource.json_schema() + validate(full_config, declarative_source_schema) + + def _stream_configs(self): + stream_configs = self._source_config.get("streams", []) + for s in stream_configs: + if "class_name" not in s: + s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream" + return stream_configs + + @staticmethod + def generate_schema() -> str: + expanded_source_definition = ManifestDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {}) + expanded_schema = expanded_source_definition.json_schema() + return json.dumps(expanded_schema, cls=SchemaEncoder) + + @staticmethod + def expand_schema_interfaces(expand_class: type, visited: dict) -> type: + """ + Recursive function that takes in class type that will have its interface fields unpacked and expended and then recursively + attempt the same expansion on all the class' underlying fields that are declarative component. It also performs expansion + with respect to interfaces that are contained within generic data types. + :param expand_class: The declarative component class that will have its interface fields expanded + :param visited: cache used to store a record of already visited declarative classes that have already been seen + :return: The expanded declarative component + """ + + # Recursive base case to stop recursion if we have already expanded an interface in case of cyclical components + # like CompositeErrorHandler + if expand_class.__name__ in visited: + return visited[expand_class.__name__] + visited[expand_class.__name__] = expand_class + + next_classes = [] + class_fields = fields(expand_class) + for field in class_fields: + unpacked_field_types = DeclarativeComponentFactory.unpack(field.type) + expand_class.__annotations__[field.name] = unpacked_field_types + next_classes.extend(ManifestDeclarativeSource._get_next_expand_classes(field.type)) + for next_class in next_classes: + ManifestDeclarativeSource.expand_schema_interfaces(next_class, visited) + return expand_class + + @staticmethod + def _get_next_expand_classes(field_type) -> list[type]: + """ + Parses through a given field type and assembles a list of all underlying declarative components. For a concrete declarative class + it will return itself. For a declarative interface it will return its subclasses. For declarative components in a generic type + it will return the unpacked classes. Any non-declarative types will be skipped. + :param field_type: A field type that + :return: + """ + generic_type = typing.get_origin(field_type) + if generic_type is None: + # We can only continue parsing declarative that inherit from the JsonSchemaMixin class because it is used + # to generate the final json schema + if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin) and not isinstance(field_type, EnumMeta): + subclasses = field_type.__subclasses__() + if subclasses: + return subclasses + else: + return [field_type] + elif generic_type == list or generic_type == Union: + next_classes = [] + for underlying_type in typing.get_args(field_type): + next_classes.extend(ManifestDeclarativeSource._get_next_expand_classes(underlying_type)) + return next_classes + return [] + + def _emit_manifest_debug_message(self, extra_args: dict): + self.logger.debug("declarative source created from manifest", extra=extra_args) + + +class SchemaEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, property) or isinstance(obj, Enum): + return str(obj) + return json.JSONEncoder.default(self, obj) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py index 440b8cc5c9823..08ee0e7ebebf7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py @@ -2,184 +2,31 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -import inspect -import json -import logging import pkgutil -import typing -from dataclasses import dataclass, fields -from enum import Enum, EnumMeta -from typing import Any, List, Mapping, Union -from airbyte_cdk.models import ConnectorSpecification -from airbyte_cdk.sources.declarative.checks import CheckStream -from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker -from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource -from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream -from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException -from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser -from airbyte_cdk.sources.streams.core import Stream -from dataclasses_jsonschema import JsonSchemaMixin -from jsonschema.validators import validate +from airbyte_cdk.sources.declarative.types import ConnectionDefinition -@dataclass -class ConcreteDeclarativeSource(JsonSchemaMixin): - version: str - checker: CheckStream - streams: List[DeclarativeStream] - - -class YamlDeclarativeSource(DeclarativeSource): +class YamlDeclarativeSource(ManifestDeclarativeSource): """Declarative source defined by a yaml file""" - VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"} - def __init__(self, path_to_yaml): """ :param path_to_yaml: Path to the yaml file describing the source """ - self.logger = logging.getLogger(f"airbyte.{self.name}") - self._factory = DeclarativeComponentFactory() self._path_to_yaml = path_to_yaml - self._source_config = self._read_and_parse_yaml_file(path_to_yaml) - - self._validate_source() - - # Stopgap to protect the top-level namespace until it's validated through the schema - unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS] - if unknown_fields: - raise InvalidConnectorDefinitionException(f"Found unknown top-level fields: {unknown_fields}") - - @property - def connection_checker(self) -> ConnectionChecker: - check = self._source_config["check"] - if "class_name" not in check: - check["class_name"] = "airbyte_cdk.sources.declarative.checks.check_stream.CheckStream" - return self._factory.create_component(check, dict())(source=self) - - def streams(self, config: Mapping[str, Any]) -> List[Stream]: - self.logger.debug( - "parsed YAML into declarative source", - extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)}, - ) - source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()] - for stream in source_streams: - # make sure the log level is always appied to the stream's logger - self._apply_log_level_to_stream_logger(self.logger, stream) - return source_streams - - def spec(self, logger: logging.Logger) -> ConnectorSpecification: - """ - Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible - configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this - will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json" - in the project root. - """ - - self.logger.debug( - "parsed YAML into declarative source", - extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)}, - ) - - spec = self._source_config.get("spec") - if spec: - if "class_name" not in spec: - spec["class_name"] = "airbyte_cdk.sources.declarative.spec.Spec" - spec_component = self._factory.create_component(spec, dict())() - return spec_component.generate_spec() - else: - return super().spec(logger) + source_config = self._read_and_parse_yaml_file(path_to_yaml) + super().__init__(source_config) - def _read_and_parse_yaml_file(self, path_to_yaml_file): + def _read_and_parse_yaml_file(self, path_to_yaml_file) -> ConnectionDefinition: package = self.__class__.__module__.split(".")[0] yaml_config = pkgutil.get_data(package, path_to_yaml_file) decoded_yaml = yaml_config.decode() return YamlParser().parse(decoded_yaml) - def _validate_source(self): - full_config = {} - if "version" in self._source_config: - full_config["version"] = self._source_config["version"] - if "check" in self._source_config: - full_config["checker"] = self._source_config["check"] - streams = [self._factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()] - if len(streams) > 0: - full_config["streams"] = streams - declarative_source_schema = ConcreteDeclarativeSource.json_schema() - validate(full_config, declarative_source_schema) - - def _stream_configs(self): - stream_configs = self._source_config.get("streams", []) - for s in stream_configs: - if "class_name" not in s: - s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream" - return stream_configs - - @staticmethod - def generate_schema() -> str: - expanded_source_definition = YamlDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {}) - expanded_schema = expanded_source_definition.json_schema() - return json.dumps(expanded_schema, cls=SchemaEncoder) - - @staticmethod - def expand_schema_interfaces(expand_class: type, visited: dict) -> type: - """ - Recursive function that takes in class type that will have its interface fields unpacked and expended and then recursively - attempt the same expansion on all the class' underlying fields that are declarative component. It also performs expansion - with respect to interfaces that are contained within generic data types. - :param expand_class: The declarative component class that will have its interface fields expanded - :param visited: cache used to store a record of already visited declarative classes that have already been seen - :return: The expanded declarative component - """ - - # Recursive base case to stop recursion if we have already expanded an interface in case of cyclical components - # like CompositeErrorHandler - if expand_class.__name__ in visited: - return visited[expand_class.__name__] - visited[expand_class.__name__] = expand_class - - next_classes = [] - class_fields = fields(expand_class) - for field in class_fields: - unpacked_field_types = DeclarativeComponentFactory.unpack(field.type) - expand_class.__annotations__[field.name] = unpacked_field_types - next_classes.extend(YamlDeclarativeSource._get_next_expand_classes(field.type)) - for next_class in next_classes: - YamlDeclarativeSource.expand_schema_interfaces(next_class, visited) - return expand_class - - @staticmethod - def _get_next_expand_classes(field_type) -> list[type]: - """ - Parses through a given field type and assembles a list of all underlying declarative components. For a concrete declarative class - it will return itself. For a declarative interface it will return its subclasses. For declarative components in a generic type - it will return the unpacked classes. Any non-declarative types will be skipped. - :param field_type: A field type that - :return: - """ - generic_type = typing.get_origin(field_type) - if generic_type is None: - # We can only continue parsing declarative that inherit from the JsonSchemaMixin class because it is used - # to generate the final json schema - if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin) and not isinstance(field_type, EnumMeta): - subclasses = field_type.__subclasses__() - if subclasses: - return subclasses - else: - return [field_type] - elif generic_type == list or generic_type == Union: - next_classes = [] - for underlying_type in typing.get_args(field_type): - next_classes.extend(YamlDeclarativeSource._get_next_expand_classes(underlying_type)) - return next_classes - return [] - - -class SchemaEncoder(json.JSONEncoder): - def default(self, obj): - if isinstance(obj, property) or isinstance(obj, Enum): - return str(obj) - return json.JSONEncoder.default(self, obj) + def _emit_manifest_debug_message(self, extra_args: dict): + extra_args["path_to_yaml"] = self._path_to_yaml + self.logger.debug("declarative source created from parsed YAML manifest", extra=extra_args) diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 46a84500bc661..8a402661c2e8e 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.7.0", + version="0.7.1", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py new file mode 100644 index 0000000000000..5485fab782b94 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -0,0 +1,601 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import json +import logging +import os +import sys + +import pytest +import yaml +from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from jsonschema.exceptions import ValidationError + +logger = logging.getLogger("airbyte") + +EXTERNAL_CONNECTION_SPECIFICATION = { + "type": "object", + "required": ["api_token"], + "additionalProperties": False, + "properties": {"api_token": {"type": "string"}}, +} + + +class MockManifestDeclarativeSource(ManifestDeclarativeSource): + """ + Mock test class that is needed to monkey patch how we read from various files that make up a declarative source because of how our + tests write configuration files during testing. It is also used to properly namespace where files get written in specific + cases like when we temporarily write files like spec.yaml to the package unit_tests, which is the directory where it will + be read in during the tests. + """ + + +class TestYamlDeclarativeSource: + @pytest.fixture + def use_external_yaml_spec(self): + # Our way of resolving the absolute path to root of the airbyte-cdk unit test directory where spec.yaml files should + # be written to (i.e. ~/airbyte/airbyte-cdk/python/unit-tests) because that is where they are read from during testing. + module = sys.modules[__name__] + module_path = os.path.abspath(module.__file__) + test_path = os.path.dirname(module_path) + spec_root = test_path.split("/sources/declarative")[0] + + spec = {"documentationUrl": "https://airbyte.com/#yaml-from-external", "connectionSpecification": EXTERNAL_CONNECTION_SPECIFICATION} + + yaml_path = os.path.join(spec_root, "spec.yaml") + with open(yaml_path, "w") as f: + f.write(yaml.dump(spec)) + yield + os.remove(yaml_path) + + def test_valid_manifest(self): + manifest = { + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"}, + "schema_loader": { + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + } + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + ManifestDeclarativeSource(source_config=manifest) + + def test_manifest_with_spec(self): + manifest = { + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"}, + "schema_loader": { + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + } + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + "spec": { + "type": "Spec", + "documentation_url": "https://airbyte.com/#yaml-from-manifest", + "connection_specification": { + "title": "Test Spec", + "type": "object", + "required": ["api_key"], + "additionalProperties": False, + "properties": { + "api_key": {"type": "string", "airbyte_secret": True, "title": "API Key", "description": "Test API Key", "order": 0} + }, + }, + }, + } + source = ManifestDeclarativeSource(source_config=manifest) + connector_specification = source.spec(logger) + assert connector_specification is not None + assert connector_specification.documentationUrl == "https://airbyte.com/#yaml-from-manifest" + assert connector_specification.connectionSpecification["title"] == "Test Spec" + assert connector_specification.connectionSpecification["required"][0] == "api_key" + assert connector_specification.connectionSpecification["additionalProperties"] is False + assert connector_specification.connectionSpecification["properties"]["api_key"] == { + "type": "string", + "airbyte_secret": True, + "title": "API Key", + "description": "Test API Key", + "order": 0, + } + + def test_manifest_with_external_spec(self, use_external_yaml_spec): + manifest = { + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"}, + "schema_loader": { + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + } + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + source = MockManifestDeclarativeSource(source_config=manifest) + + connector_specification = source.spec(logger) + + assert connector_specification.documentationUrl == "https://airbyte.com/#yaml-from-external" + assert connector_specification.connectionSpecification == EXTERNAL_CONNECTION_SPECIFICATION + + def test_source_is_not_created_if_toplevel_fields_are_unknown(self): + manifest = { + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"}, + "schema_loader": { + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + } + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + "not_a_valid_field": "error", + } + with pytest.raises(InvalidConnectorDefinitionException): + ManifestDeclarativeSource(manifest) + + def test_source_missing_checker_fails_validation(self): + manifest = { + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"}, + "schema_loader": { + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + } + ], + "check": {"type": "CheckStream"}, + } + with pytest.raises(ValidationError): + ManifestDeclarativeSource(source_config=manifest) + + def test_source_with_missing_streams_fails(self): + manifest = {"version": "version", "definitions": None, "check": {"type": "CheckStream", "stream_names": ["lists"]}} + with pytest.raises(ValidationError): + ManifestDeclarativeSource(manifest) + + def test_source_with_missing_version_fails(self): + manifest = { + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"}, + "schema_loader": { + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + } + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + with pytest.raises(ValidationError): + ManifestDeclarativeSource(manifest) + + def test_source_with_invalid_stream_config_fails_validation(self): + manifest = { + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"} + }, + "streams": [ + { + "type": "DeclarativeStream", + "$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"}, + "schema_loader": { + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + }, + } + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + with pytest.raises(ValidationError): + ManifestDeclarativeSource(manifest) + + def test_source_with_no_external_spec_and_no_in_yaml_spec_fails(self): + manifest = { + "version": "version", + "definitions": { + "schema_loader": {"name": "{{ options.stream_name }}", "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml"}, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "$options": {"name": "lists", "primary_key": "id", "url_base": "https://api.sendgrid.com"}, + "schema_loader": { + "name": "{{ options.stream_name }}", + "file_path": "./source_sendgrid/schemas/{{ options.name }}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": {"inject_into": "request_parameter", "field_name": "page_size"}, + "page_token_option": {"inject_into": "path"}, + "pagination_strategy": {"type": "CursorPagination", "cursor_value": "{{ response._metadata.next }}"}, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": {"type": "BearerAuthenticator", "api_token": "{{ config.apikey }}"}, + "request_parameters": {"page_size": 10}, + }, + "record_selector": {"extractor": {"field_pointer": ["result"]}}, + }, + } + ], + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + source = ManifestDeclarativeSource(source_config=manifest) + + # We expect to fail here because we have not created a temporary spec.yaml file + with pytest.raises(FileNotFoundError): + source.spec(logger) + + +def test_generate_schema(): + schema_str = ManifestDeclarativeSource.generate_schema() + schema = json.loads(schema_str) + + assert "version" in schema["required"] + assert "checker" in schema["required"] + assert "streams" in schema["required"] + assert schema["properties"]["checker"]["$ref"] == "#/definitions/CheckStream" + assert schema["properties"]["streams"]["items"]["$ref"] == "#/definitions/DeclarativeStream" + + check_stream = schema["definitions"]["CheckStream"] + assert {"stream_names"}.issubset(check_stream["required"]) + assert check_stream["properties"]["stream_names"]["type"] == "array" + assert check_stream["properties"]["stream_names"]["items"]["type"] == "string" + + declarative_stream = schema["definitions"]["DeclarativeStream"] + assert {"retriever", "config"}.issubset(declarative_stream["required"]) + assert {"$ref": "#/definitions/DefaultSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"] + assert {"$ref": "#/definitions/JsonFileSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"] + assert declarative_stream["properties"]["retriever"]["$ref"] == "#/definitions/SimpleRetriever" + assert declarative_stream["properties"]["name"]["type"] == "string" + assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["primary_key"]["anyOf"] + assert {"type": "array", "items": {"type": "array", "items": {"type": "string"}}} in declarative_stream["properties"]["primary_key"][ + "anyOf" + ] + assert {"type": "string"} in declarative_stream["properties"]["primary_key"]["anyOf"] + assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["stream_cursor_field"]["anyOf"] + assert {"type": "string"} in declarative_stream["properties"]["stream_cursor_field"]["anyOf"] + assert declarative_stream["properties"]["transformations"]["type"] == "array" + assert {"$ref": "#/definitions/AddFields"} in declarative_stream["properties"]["transformations"]["items"]["anyOf"] + assert {"$ref": "#/definitions/RemoveFields"} in declarative_stream["properties"]["transformations"]["items"]["anyOf"] + assert declarative_stream["properties"]["checkpoint_interval"]["type"] == "integer" + + simple_retriever = schema["definitions"]["SimpleRetriever"]["allOf"][1] + assert {"requester", "record_selector"}.issubset(simple_retriever["required"]) + assert simple_retriever["properties"]["requester"]["$ref"] == "#/definitions/HttpRequester" + assert simple_retriever["properties"]["record_selector"]["$ref"] == "#/definitions/RecordSelector" + assert simple_retriever["properties"]["name"]["type"] == "string" + assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["primary_key"]["anyOf"] + assert {"type": "array", "items": {"type": "array", "items": {"type": "string"}}} in declarative_stream["properties"]["primary_key"][ + "anyOf" + ] + assert {"type": "string"} in declarative_stream["properties"]["primary_key"]["anyOf"] + assert {"$ref": "#/definitions/DefaultPaginator"} in simple_retriever["properties"]["paginator"]["anyOf"] + assert {"$ref": "#/definitions/NoPagination"} in simple_retriever["properties"]["paginator"]["anyOf"] + assert {"$ref": "#/definitions/CartesianProductStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"] + assert {"$ref": "#/definitions/DatetimeStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"] + assert {"$ref": "#/definitions/ListStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"] + assert {"$ref": "#/definitions/SingleSlice"} in simple_retriever["properties"]["stream_slicer"]["anyOf"] + assert {"$ref": "#/definitions/SubstreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"] + + http_requester = schema["definitions"]["HttpRequester"]["allOf"][1] + assert {"name", "url_base", "path", "config"}.issubset(http_requester["required"]) + assert http_requester["properties"]["name"]["type"] == "string" + assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"] + assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"] + assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"] + assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"] + assert {"type": "string"} in http_requester["properties"]["http_method"]["anyOf"] + assert {"type": "string", "enum": ["GET", "POST"]} in http_requester["properties"]["http_method"]["anyOf"] + assert http_requester["properties"]["request_options_provider"]["$ref"] == "#/definitions/InterpolatedRequestOptionsProvider" + assert {"$ref": "#/definitions/DeclarativeOauth2Authenticator"} in http_requester["properties"]["authenticator"]["anyOf"] + assert {"$ref": "#/definitions/ApiKeyAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"] + assert {"$ref": "#/definitions/BearerAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"] + assert {"$ref": "#/definitions/BasicHttpAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"] + assert {"$ref": "#/definitions/CompositeErrorHandler"} in http_requester["properties"]["error_handler"]["anyOf"] + assert {"$ref": "#/definitions/DefaultErrorHandler"} in http_requester["properties"]["error_handler"]["anyOf"] + + api_key_authenticator = schema["definitions"]["ApiKeyAuthenticator"]["allOf"][1] + assert {"header", "api_token", "config"}.issubset(api_key_authenticator["required"]) + assert {"$ref": "#/definitions/InterpolatedString"} in api_key_authenticator["properties"]["header"]["anyOf"] + assert {"type": "string"} in api_key_authenticator["properties"]["header"]["anyOf"] + assert {"$ref": "#/definitions/InterpolatedString"} in api_key_authenticator["properties"]["api_token"]["anyOf"] + assert {"type": "string"} in api_key_authenticator["properties"]["api_token"]["anyOf"] + + default_error_handler = schema["definitions"]["DefaultErrorHandler"]["allOf"][1] + assert default_error_handler["properties"]["response_filters"]["type"] == "array" + assert default_error_handler["properties"]["response_filters"]["items"]["$ref"] == "#/definitions/HttpResponseFilter" + assert default_error_handler["properties"]["max_retries"]["type"] == "integer" + assert default_error_handler["properties"]["backoff_strategies"]["type"] == "array" + + default_paginator = schema["definitions"]["DefaultPaginator"]["allOf"][1] + assert {"page_token_option", "pagination_strategy", "config", "url_base"}.issubset(default_paginator["required"]) + assert default_paginator["properties"]["page_size_option"]["$ref"] == "#/definitions/RequestOption" + assert default_paginator["properties"]["page_token_option"]["$ref"] == "#/definitions/RequestOption" + assert {"$ref": "#/definitions/CursorPaginationStrategy"} in default_paginator["properties"]["pagination_strategy"]["anyOf"] + assert {"$ref": "#/definitions/OffsetIncrement"} in default_paginator["properties"]["pagination_strategy"]["anyOf"] + assert {"$ref": "#/definitions/PageIncrement"} in default_paginator["properties"]["pagination_strategy"]["anyOf"] + assert default_paginator["properties"]["decoder"]["$ref"] == "#/definitions/JsonDecoder" + assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"] + assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"] + + cursor_pagination_strategy = schema["definitions"]["CursorPaginationStrategy"]["allOf"][1] + assert {"cursor_value", "config"}.issubset(cursor_pagination_strategy["required"]) + assert {"$ref": "#/definitions/InterpolatedString"} in cursor_pagination_strategy["properties"]["cursor_value"]["anyOf"] + assert {"type": "string"} in cursor_pagination_strategy["properties"]["cursor_value"]["anyOf"] + assert {"$ref": "#/definitions/InterpolatedBoolean"} in cursor_pagination_strategy["properties"]["stop_condition"]["anyOf"] + assert {"type": "string"} in cursor_pagination_strategy["properties"]["stop_condition"]["anyOf"] + assert cursor_pagination_strategy["properties"]["decoder"]["$ref"] == "#/definitions/JsonDecoder" + + list_stream_slicer = schema["definitions"]["ListStreamSlicer"]["allOf"][1] + assert {"slice_values", "cursor_field", "config"}.issubset(list_stream_slicer["required"]) + assert {"type": "array", "items": {"type": "string"}} in list_stream_slicer["properties"]["slice_values"]["anyOf"] + assert {"type": "string"} in list_stream_slicer["properties"]["slice_values"]["anyOf"] + assert {"$ref": "#/definitions/InterpolatedString"} in list_stream_slicer["properties"]["cursor_field"]["anyOf"] + assert {"type": "string"} in list_stream_slicer["properties"]["cursor_field"]["anyOf"] + assert list_stream_slicer["properties"]["request_option"]["$ref"] == "#/definitions/RequestOption" + + added_field_definition = schema["definitions"]["AddedFieldDefinition"] + assert {"path", "value"}.issubset(added_field_definition["required"]) + assert added_field_definition["properties"]["path"]["type"] == "array" + assert added_field_definition["properties"]["path"]["items"]["type"] == "string" + assert {"$ref": "#/definitions/InterpolatedString"} in added_field_definition["properties"]["value"]["anyOf"] + assert {"type": "string"} in added_field_definition["properties"]["value"]["anyOf"] + + # There is something very strange about JsonSchemaMixin.json_schema(). For some reason, when this test is called independently + # it will pass. However, when it is invoked with the entire test file, certain components won't get generated in the schema. Since + # the generate_schema() method is invoked by independently so this doesn't happen under normal circumstance when we generate the + # complete schema. It only happens when the tests are all called together. + # One way to replicate this is to add DefaultErrorHandler.json_schema() to the start of this test and uncomment the assertions below + + # assert {"$ref": "#/definitions/ConstantBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"]["items"]["anyOf"] + # assert {"$ref": "#/definitions/ExponentialBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"]["items"][ + # "anyOf" + # ] + # assert {"$ref": "#/definitions/WaitTimeFromHeaderBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"][ + # "items" + # ]["anyOf"] + # assert {"$ref": "#/definitions/WaitUntilTimeFromHeaderBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"][ + # "items" + # ]["anyOf"] + # + # exponential_backoff_strategy = schema["definitions"]["ExponentialBackoffStrategy"]["allOf"][1] + # assert exponential_backoff_strategy["properties"]["factor"]["type"] == "number" diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py index adceb433191e3..0b3f0c2e45ee3 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py @@ -2,21 +2,19 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -import json import logging import os -import sys import tempfile import pytest -import yaml -from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException +from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource -from jsonschema import ValidationError +from yaml.parser import ParserError logger = logging.getLogger("airbyte") + EXTERNAL_CONNECTION_SPECIFICATION = { "type": "object", "required": ["api_token"], @@ -46,23 +44,6 @@ def _read_and_parse_yaml_file(self, path_to_yaml_file): class TestYamlDeclarativeSource: - @pytest.fixture - def use_external_yaml_spec(self): - # Our way of resolving the absolute path to root of the airbyte-cdk unit test directory where spec.yaml files should - # be written to (i.e. ~/airbyte/airbyte-cdk/python/unit-tests) because that is where they are read from during testing. - module = sys.modules[__name__] - module_path = os.path.abspath(module.__file__) - test_path = os.path.dirname(module_path) - spec_root = test_path.split("/sources/declarative")[0] - - spec = {"documentationUrl": "https://airbyte.com/#yaml-from-external", "connectionSpecification": EXTERNAL_CONNECTION_SPECIFICATION} - - yaml_path = os.path.join(spec_root, "spec.yaml") - with open(yaml_path, "w") as f: - f.write(yaml.dump(spec)) - yield - os.remove(yaml_path) - def test_source_is_created_if_toplevel_fields_are_known(self): content = """ version: "version" @@ -107,204 +88,32 @@ def test_source_is_created_if_toplevel_fields_are_known(self): temporary_file = TestFileContent(content) MockYamlDeclarativeSource(temporary_file.filename) - def test_source_with_spec_in_yaml(self): + def test_source_fails_for_invalid_yaml(self): content = """ version: "version" definitions: - schema_loader: - name: "{{ options.stream_name }}" - file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml" - retriever: - paginator: - type: "DefaultPaginator" - page_size: 10 - page_size_option: - inject_into: request_parameter - field_name: page_size - page_token_option: - inject_into: path - pagination_strategy: - type: "CursorPagination" - cursor_value: "{{ response._metadata.next }}" - requester: - path: "/v3/marketing/lists" - authenticator: - type: "BearerAuthenticator" - api_token: "{{ config.apikey }}" - request_parameters: - page_size: 10 - record_selector: - extractor: - field_pointer: ["result"] + this is not parsable yaml: " at all streams: - type: DeclarativeStream $options: name: "lists" primary_key: id url_base: "https://api.sendgrid.com" - schema_loader: "*ref(definitions.schema_loader)" - retriever: "*ref(definitions.retriever)" check: type: CheckStream stream_names: ["lists"] - spec: - type: Spec - documentation_url: https://airbyte.com/#yaml-from-manifest - connection_specification: - title: Test Spec - type: object - required: - - api_key - additionalProperties: false - properties: - api_key: - type: string - airbyte_secret: true - title: API Key - description: Test API Key - order: 0 """ temporary_file = TestFileContent(content) - source = MockYamlDeclarativeSource(temporary_file.filename) - - connector_specification = source.spec(logger) - assert connector_specification is not None - assert connector_specification.documentationUrl == "https://airbyte.com/#yaml-from-manifest" - assert connector_specification.connectionSpecification["title"] == "Test Spec" - assert connector_specification.connectionSpecification["required"][0] == "api_key" - assert connector_specification.connectionSpecification["additionalProperties"] is False - assert connector_specification.connectionSpecification["properties"]["api_key"] == { - "type": "string", - "airbyte_secret": True, - "title": "API Key", - "description": "Test API Key", - "order": 0, - } - - def test_source_with_external_spec(self, use_external_yaml_spec): - content = """ - version: "version" - definitions: - schema_loader: - name: "{{ options.stream_name }}" - file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml" - retriever: - paginator: - type: "DefaultPaginator" - page_size: 10 - page_size_option: - inject_into: request_parameter - field_name: page_size - page_token_option: - inject_into: path - pagination_strategy: - type: "CursorPagination" - cursor_value: "{{ response._metadata.next }}" - requester: - path: "/v3/marketing/lists" - authenticator: - type: "BearerAuthenticator" - api_token: "{{ config.apikey }}" - request_parameters: - page_size: 10 - record_selector: - extractor: - field_pointer: ["result"] - streams: - - type: DeclarativeStream - $options: - name: "lists" - primary_key: id - url_base: "https://api.sendgrid.com" - schema_loader: "*ref(definitions.schema_loader)" - retriever: "*ref(definitions.retriever)" - check: - type: CheckStream - stream_names: ["lists"] - """ - temporary_file = TestFileContent(content) - source = MockYamlDeclarativeSource(temporary_file.filename) - - connector_specification = source.spec(logger) - - assert connector_specification.documentationUrl == "https://airbyte.com/#yaml-from-external" - assert connector_specification.connectionSpecification == EXTERNAL_CONNECTION_SPECIFICATION - - def test_source_is_not_created_if_toplevel_fields_are_unknown(self): - content = """ - version: "version" - definitions: - schema_loader: - name: "{{ options.stream_name }}" - file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml" - retriever: - paginator: - type: "DefaultPaginator" - page_size: 10 - page_size_option: - inject_into: request_parameter - field_name: page_size - page_token_option: - inject_into: path - pagination_strategy: - type: "CursorPagination" - cursor_value: "{{ response._metadata.next }}" - requester: - path: "/v3/marketing/lists" - authenticator: - type: "BearerAuthenticator" - api_token: "{{ config.apikey }}" - request_parameters: - page_size: 10 - record_selector: - extractor: - field_pointer: ["result"] - streams: - - type: DeclarativeStream - $options: - name: "lists" - primary_key: id - url_base: "https://api.sendgrid.com" - schema_loader: "*ref(definitions.schema_loader)" - retriever: "*ref(definitions.retriever)" - check: - type: CheckStream - stream_names: ["lists"] - not_a_valid_field: "error" - """ - temporary_file = TestFileContent(content) - with pytest.raises(InvalidConnectorDefinitionException): + with pytest.raises(ParserError): MockYamlDeclarativeSource(temporary_file.filename) - def test_source_missing_checker_fails_validation(self): + def test_source_with_missing_reference_fails(self): content = """ version: "version" definitions: schema_loader: name: "{{ options.stream_name }}" file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml" - retriever: - paginator: - type: "DefaultPaginator" - page_size: 10 - page_size_option: - inject_into: request_parameter - field_name: page_size - page_token_option: - inject_into: path - pagination_strategy: - type: "CursorPagination" - cursor_value: "{{ response._metadata.next }}" - requester: - path: "/v3/marketing/lists" - authenticator: - type: "BearerAuthenticator" - api_token: "{{ config.apikey }}" - request_parameters: - page_size: 10 - record_selector: - extractor: - field_pointer: ["result"] streams: - type: DeclarativeStream $options: @@ -313,139 +122,14 @@ def test_source_missing_checker_fails_validation(self): url_base: "https://api.sendgrid.com" schema_loader: "*ref(definitions.schema_loader)" retriever: "*ref(definitions.retriever)" - check: - type: CheckStream - """ - temporary_file = TestFileContent(content) - with pytest.raises(ValidationError): - MockYamlDeclarativeSource(temporary_file.filename) - - def test_source_with_missing_streams_fails(self): - content = """ - version: "version" - definitions: - check: - type: CheckStream - stream_names: ["lists"] - """ - temporary_file = TestFileContent(content) - with pytest.raises(ValidationError): - MockYamlDeclarativeSource(temporary_file.filename) - - def test_source_with_missing_version_fails(self): - content = """ - definitions: - schema_loader: - name: "{{ options.stream_name }}" - file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml" - retriever: - paginator: - type: "DefaultPaginator" - page_size: 10 - page_size_option: - inject_into: request_parameter - field_name: page_size - page_token_option: - inject_into: path - pagination_strategy: - type: "CursorPagination" - cursor_value: "{{ response._metadata.next }}" - requester: - path: "/v3/marketing/lists" - authenticator: - type: "BearerAuthenticator" - api_token: "{{ config.apikey }}" - request_parameters: - page_size: 10 - record_selector: - extractor: - field_pointer: ["result"] - streams: - - type: DeclarativeStream - $options: - name: "lists" - primary_key: id - url_base: "https://api.sendgrid.com" - schema_loader: "*ref(definitions.schema_loader)" - retriever: "*ref(definitions.retriever)" - check: - type: CheckStream - stream_names: ["lists"] - """ - temporary_file = TestFileContent(content) - with pytest.raises(ValidationError): - MockYamlDeclarativeSource(temporary_file.filename) - - def test_source_with_invalid_stream_config_fails_validation(self): - content = """ - version: "version" - definitions: - schema_loader: - name: "{{ options.stream_name }}" - file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml" - streams: - - type: DeclarativeStream - $options: - name: "lists" - primary_key: id - url_base: "https://api.sendgrid.com" - schema_loader: "*ref(definitions.schema_loader)" check: type: CheckStream stream_names: ["lists"] """ temporary_file = TestFileContent(content) - with pytest.raises(ValidationError): + with pytest.raises(UndefinedReferenceException): MockYamlDeclarativeSource(temporary_file.filename) - def test_source_with_no_external_spec_and_no_in_yaml_spec_fails(self): - content = """ - version: "version" - definitions: - schema_loader: - name: "{{ options.stream_name }}" - file_path: "./source_sendgrid/schemas/{{ options.name }}.yaml" - retriever: - paginator: - type: "DefaultPaginator" - page_size: 10 - page_size_option: - inject_into: request_parameter - field_name: page_size - page_token_option: - inject_into: path - pagination_strategy: - type: "CursorPagination" - cursor_value: "{{ response._metadata.next }}" - requester: - path: "/v3/marketing/lists" - authenticator: - type: "BearerAuthenticator" - api_token: "{{ config.apikey }}" - request_parameters: - page_size: 10 - record_selector: - extractor: - field_pointer: ["result"] - streams: - - type: DeclarativeStream - $options: - name: "lists" - primary_key: id - url_base: "https://api.sendgrid.com" - schema_loader: "*ref(definitions.schema_loader)" - retriever: "*ref(definitions.retriever)" - check: - type: CheckStream - stream_names: ["lists"] - """ - temporary_file = TestFileContent(content) - source = MockYamlDeclarativeSource(temporary_file.filename) - - # We expect to fail here because we have not created a temporary spec.yaml file - with pytest.raises(FileNotFoundError): - source.spec(logger) - class TestFileContent: def __init__(self, content): @@ -463,139 +147,3 @@ def __enter__(self): def __exit__(self, type, value, traceback): os.unlink(self.filename) - - -def test_generate_schema(): - schema_str = YamlDeclarativeSource.generate_schema() - schema = json.loads(schema_str) - - assert "version" in schema["required"] - assert "checker" in schema["required"] - assert "streams" in schema["required"] - assert schema["properties"]["checker"]["$ref"] == "#/definitions/CheckStream" - assert schema["properties"]["streams"]["items"]["$ref"] == "#/definitions/DeclarativeStream" - - check_stream = schema["definitions"]["CheckStream"] - assert {"stream_names"}.issubset(check_stream["required"]) - assert check_stream["properties"]["stream_names"]["type"] == "array" - assert check_stream["properties"]["stream_names"]["items"]["type"] == "string" - - declarative_stream = schema["definitions"]["DeclarativeStream"] - assert {"retriever", "config"}.issubset(declarative_stream["required"]) - assert {"$ref": "#/definitions/DefaultSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"] - assert {"$ref": "#/definitions/JsonFileSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"] - assert declarative_stream["properties"]["retriever"]["$ref"] == "#/definitions/SimpleRetriever" - assert declarative_stream["properties"]["name"]["type"] == "string" - assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["primary_key"]["anyOf"] - assert {"type": "array", "items": {"type": "array", "items": {"type": "string"}}} in declarative_stream["properties"]["primary_key"][ - "anyOf" - ] - assert {"type": "string"} in declarative_stream["properties"]["primary_key"]["anyOf"] - assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["stream_cursor_field"]["anyOf"] - assert {"type": "string"} in declarative_stream["properties"]["stream_cursor_field"]["anyOf"] - assert declarative_stream["properties"]["transformations"]["type"] == "array" - assert {"$ref": "#/definitions/AddFields"} in declarative_stream["properties"]["transformations"]["items"]["anyOf"] - assert {"$ref": "#/definitions/RemoveFields"} in declarative_stream["properties"]["transformations"]["items"]["anyOf"] - assert declarative_stream["properties"]["checkpoint_interval"]["type"] == "integer" - - simple_retriever = schema["definitions"]["SimpleRetriever"]["allOf"][1] - assert {"requester", "record_selector"}.issubset(simple_retriever["required"]) - assert simple_retriever["properties"]["requester"]["$ref"] == "#/definitions/HttpRequester" - assert simple_retriever["properties"]["record_selector"]["$ref"] == "#/definitions/RecordSelector" - assert simple_retriever["properties"]["name"]["type"] == "string" - assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["primary_key"]["anyOf"] - assert {"type": "array", "items": {"type": "array", "items": {"type": "string"}}} in declarative_stream["properties"]["primary_key"][ - "anyOf" - ] - assert {"type": "string"} in declarative_stream["properties"]["primary_key"]["anyOf"] - assert {"$ref": "#/definitions/DefaultPaginator"} in simple_retriever["properties"]["paginator"]["anyOf"] - assert {"$ref": "#/definitions/NoPagination"} in simple_retriever["properties"]["paginator"]["anyOf"] - assert {"$ref": "#/definitions/CartesianProductStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"] - assert {"$ref": "#/definitions/DatetimeStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"] - assert {"$ref": "#/definitions/ListStreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"] - assert {"$ref": "#/definitions/SingleSlice"} in simple_retriever["properties"]["stream_slicer"]["anyOf"] - assert {"$ref": "#/definitions/SubstreamSlicer"} in simple_retriever["properties"]["stream_slicer"]["anyOf"] - - http_requester = schema["definitions"]["HttpRequester"]["allOf"][1] - assert {"name", "url_base", "path", "config"}.issubset(http_requester["required"]) - assert http_requester["properties"]["name"]["type"] == "string" - assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"] - assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"] - assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"] - assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"] - assert {"type": "string"} in http_requester["properties"]["http_method"]["anyOf"] - assert {"type": "string", "enum": ["GET", "POST"]} in http_requester["properties"]["http_method"]["anyOf"] - assert http_requester["properties"]["request_options_provider"]["$ref"] == "#/definitions/InterpolatedRequestOptionsProvider" - assert {"$ref": "#/definitions/DeclarativeOauth2Authenticator"} in http_requester["properties"]["authenticator"]["anyOf"] - assert {"$ref": "#/definitions/ApiKeyAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"] - assert {"$ref": "#/definitions/BearerAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"] - assert {"$ref": "#/definitions/BasicHttpAuthenticator"} in http_requester["properties"]["authenticator"]["anyOf"] - assert {"$ref": "#/definitions/CompositeErrorHandler"} in http_requester["properties"]["error_handler"]["anyOf"] - assert {"$ref": "#/definitions/DefaultErrorHandler"} in http_requester["properties"]["error_handler"]["anyOf"] - - api_key_authenticator = schema["definitions"]["ApiKeyAuthenticator"]["allOf"][1] - assert {"header", "api_token", "config"}.issubset(api_key_authenticator["required"]) - assert {"$ref": "#/definitions/InterpolatedString"} in api_key_authenticator["properties"]["header"]["anyOf"] - assert {"type": "string"} in api_key_authenticator["properties"]["header"]["anyOf"] - assert {"$ref": "#/definitions/InterpolatedString"} in api_key_authenticator["properties"]["api_token"]["anyOf"] - assert {"type": "string"} in api_key_authenticator["properties"]["api_token"]["anyOf"] - - default_error_handler = schema["definitions"]["DefaultErrorHandler"]["allOf"][1] - assert default_error_handler["properties"]["response_filters"]["type"] == "array" - assert default_error_handler["properties"]["response_filters"]["items"]["$ref"] == "#/definitions/HttpResponseFilter" - assert default_error_handler["properties"]["max_retries"]["type"] == "integer" - assert default_error_handler["properties"]["backoff_strategies"]["type"] == "array" - - default_paginator = schema["definitions"]["DefaultPaginator"]["allOf"][1] - assert {"page_token_option", "pagination_strategy", "config", "url_base"}.issubset(default_paginator["required"]) - assert default_paginator["properties"]["page_size_option"]["$ref"] == "#/definitions/RequestOption" - assert default_paginator["properties"]["page_token_option"]["$ref"] == "#/definitions/RequestOption" - assert {"$ref": "#/definitions/CursorPaginationStrategy"} in default_paginator["properties"]["pagination_strategy"]["anyOf"] - assert {"$ref": "#/definitions/OffsetIncrement"} in default_paginator["properties"]["pagination_strategy"]["anyOf"] - assert {"$ref": "#/definitions/PageIncrement"} in default_paginator["properties"]["pagination_strategy"]["anyOf"] - assert default_paginator["properties"]["decoder"]["$ref"] == "#/definitions/JsonDecoder" - assert {"$ref": "#/definitions/InterpolatedString"} in http_requester["properties"]["url_base"]["anyOf"] - assert {"type": "string"} in http_requester["properties"]["path"]["anyOf"] - - cursor_pagination_strategy = schema["definitions"]["CursorPaginationStrategy"]["allOf"][1] - assert {"cursor_value", "config"}.issubset(cursor_pagination_strategy["required"]) - assert {"$ref": "#/definitions/InterpolatedString"} in cursor_pagination_strategy["properties"]["cursor_value"]["anyOf"] - assert {"type": "string"} in cursor_pagination_strategy["properties"]["cursor_value"]["anyOf"] - assert {"$ref": "#/definitions/InterpolatedBoolean"} in cursor_pagination_strategy["properties"]["stop_condition"]["anyOf"] - assert {"type": "string"} in cursor_pagination_strategy["properties"]["stop_condition"]["anyOf"] - assert cursor_pagination_strategy["properties"]["decoder"]["$ref"] == "#/definitions/JsonDecoder" - - list_stream_slicer = schema["definitions"]["ListStreamSlicer"]["allOf"][1] - assert {"slice_values", "cursor_field", "config"}.issubset(list_stream_slicer["required"]) - assert {"type": "array", "items": {"type": "string"}} in list_stream_slicer["properties"]["slice_values"]["anyOf"] - assert {"type": "string"} in list_stream_slicer["properties"]["slice_values"]["anyOf"] - assert {"$ref": "#/definitions/InterpolatedString"} in list_stream_slicer["properties"]["cursor_field"]["anyOf"] - assert {"type": "string"} in list_stream_slicer["properties"]["cursor_field"]["anyOf"] - assert list_stream_slicer["properties"]["request_option"]["$ref"] == "#/definitions/RequestOption" - - added_field_definition = schema["definitions"]["AddedFieldDefinition"] - assert {"path", "value"}.issubset(added_field_definition["required"]) - assert added_field_definition["properties"]["path"]["type"] == "array" - assert added_field_definition["properties"]["path"]["items"]["type"] == "string" - assert {"$ref": "#/definitions/InterpolatedString"} in added_field_definition["properties"]["value"]["anyOf"] - assert {"type": "string"} in added_field_definition["properties"]["value"]["anyOf"] - - # There is something very strange about JsonSchemaMixin.json_schema(). For some reason, when this test is called independently - # it will pass. However, when it is invoked with the entire test file, certain components won't get generated in the schema. Since - # the generate_schema() method is invoked by independently so this doesn't happen under normal circumstance when we generate the - # complete schema. It only happens when the tests are all called together. - # One way to replicate this is to add DefaultErrorHandler.json_schema() to the start of this test and uncomment the assertions below - - # assert {"$ref": "#/definitions/ConstantBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"]["items"]["anyOf"] - # assert {"$ref": "#/definitions/ExponentialBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"]["items"][ - # "anyOf" - # ] - # assert {"$ref": "#/definitions/WaitTimeFromHeaderBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"][ - # "items" - # ]["anyOf"] - # assert {"$ref": "#/definitions/WaitUntilTimeFromHeaderBackoffStrategy"} in default_error_handler["properties"]["backoff_strategies"][ - # "items" - # ]["anyOf"] - # - # exponential_backoff_strategy = schema["definitions"]["ExponentialBackoffStrategy"]["allOf"][1] - # assert exponential_backoff_strategy["properties"]["factor"]["type"] == "number"