Skip to content

Commit

Permalink
[low-code cdk] decouple parsing the yaml manifest from the declarativ…
Browse files Browse the repository at this point in the history
…e source implementation (#19095)

* decouple parsing the yaml manifest from the declarative source implementation

* bump version and changelog
  • Loading branch information
brianjlai authored and akashkulk committed Nov 17, 2022
1 parent 81f1d62 commit 560f4e8
Show file tree
Hide file tree
Showing 6 changed files with 795 additions and 623 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.7.1
Low-code: Decouple yaml manifest parsing from the declarative source implementation

## 0.7.0
Low-code: Allow connector specifications to be defined in the manifest

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import inspect
import json
import logging
import typing
from dataclasses import dataclass, fields
from enum import Enum, EnumMeta
from typing import Any, List, Mapping, Union

from airbyte_cdk.models import ConnectorSpecification
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.types import ConnectionDefinition
from airbyte_cdk.sources.streams.core import Stream
from dataclasses_jsonschema import JsonSchemaMixin
from jsonschema.validators import validate


@dataclass
class ConcreteDeclarativeSource(JsonSchemaMixin):
version: str
checker: CheckStream
streams: List[DeclarativeStream]


class ManifestDeclarativeSource(DeclarativeSource):
"""Declarative source defined by a manifest of low-code components that define source connector behavior"""

VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}

def __init__(self, source_config: ConnectionDefinition):
"""
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")
self._source_config = source_config
self._factory = DeclarativeComponentFactory()

self._validate_source()

# Stopgap to protect the top-level namespace until it's validated through the schema
unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS]
if unknown_fields:
raise InvalidConnectorDefinitionException(f"Found unknown top-level fields: {unknown_fields}")

@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
if "class_name" not in check:
check["class_name"] = "airbyte_cdk.sources.declarative.checks.check_stream.CheckStream"
return self._factory.create_component(check, dict())(source=self)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})

source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
for stream in source_streams:
# make sure the log level is always applied to the stream's logger
self._apply_log_level_to_stream_logger(self.logger, stream)
return source_streams

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
in the project root.
"""
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})

spec = self._source_config.get("spec")
if spec:
if "class_name" not in spec:
spec["class_name"] = "airbyte_cdk.sources.declarative.spec.Spec"
spec_component = self._factory.create_component(spec, dict())()
return spec_component.generate_spec()
else:
return super().spec(logger)

def _validate_source(self):
full_config = {}
if "version" in self._source_config:
full_config["version"] = self._source_config["version"]
if "check" in self._source_config:
full_config["checker"] = self._source_config["check"]
streams = [self._factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()]
if len(streams) > 0:
full_config["streams"] = streams
declarative_source_schema = ConcreteDeclarativeSource.json_schema()
validate(full_config, declarative_source_schema)

def _stream_configs(self):
stream_configs = self._source_config.get("streams", [])
for s in stream_configs:
if "class_name" not in s:
s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
return stream_configs

@staticmethod
def generate_schema() -> str:
expanded_source_definition = ManifestDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
expanded_schema = expanded_source_definition.json_schema()
return json.dumps(expanded_schema, cls=SchemaEncoder)

@staticmethod
def expand_schema_interfaces(expand_class: type, visited: dict) -> type:
"""
Recursive function that takes in class type that will have its interface fields unpacked and expended and then recursively
attempt the same expansion on all the class' underlying fields that are declarative component. It also performs expansion
with respect to interfaces that are contained within generic data types.
:param expand_class: The declarative component class that will have its interface fields expanded
:param visited: cache used to store a record of already visited declarative classes that have already been seen
:return: The expanded declarative component
"""

# Recursive base case to stop recursion if we have already expanded an interface in case of cyclical components
# like CompositeErrorHandler
if expand_class.__name__ in visited:
return visited[expand_class.__name__]
visited[expand_class.__name__] = expand_class

next_classes = []
class_fields = fields(expand_class)
for field in class_fields:
unpacked_field_types = DeclarativeComponentFactory.unpack(field.type)
expand_class.__annotations__[field.name] = unpacked_field_types
next_classes.extend(ManifestDeclarativeSource._get_next_expand_classes(field.type))
for next_class in next_classes:
ManifestDeclarativeSource.expand_schema_interfaces(next_class, visited)
return expand_class

@staticmethod
def _get_next_expand_classes(field_type) -> list[type]:
"""
Parses through a given field type and assembles a list of all underlying declarative components. For a concrete declarative class
it will return itself. For a declarative interface it will return its subclasses. For declarative components in a generic type
it will return the unpacked classes. Any non-declarative types will be skipped.
:param field_type: A field type that
:return:
"""
generic_type = typing.get_origin(field_type)
if generic_type is None:
# We can only continue parsing declarative that inherit from the JsonSchemaMixin class because it is used
# to generate the final json schema
if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin) and not isinstance(field_type, EnumMeta):
subclasses = field_type.__subclasses__()
if subclasses:
return subclasses
else:
return [field_type]
elif generic_type == list or generic_type == Union:
next_classes = []
for underlying_type in typing.get_args(field_type):
next_classes.extend(ManifestDeclarativeSource._get_next_expand_classes(underlying_type))
return next_classes
return []

def _emit_manifest_debug_message(self, extra_args: dict):
self.logger.debug("declarative source created from manifest", extra=extra_args)


class SchemaEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, property) or isinstance(obj, Enum):
return str(obj)
return json.JSONEncoder.default(self, obj)
Original file line number Diff line number Diff line change
Expand Up @@ -2,184 +2,31 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import inspect
import json
import logging
import pkgutil
import typing
from dataclasses import dataclass, fields
from enum import Enum, EnumMeta
from typing import Any, List, Mapping, Union

from airbyte_cdk.models import ConnectorSpecification
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
from airbyte_cdk.sources.streams.core import Stream
from dataclasses_jsonschema import JsonSchemaMixin
from jsonschema.validators import validate
from airbyte_cdk.sources.declarative.types import ConnectionDefinition


@dataclass
class ConcreteDeclarativeSource(JsonSchemaMixin):
version: str
checker: CheckStream
streams: List[DeclarativeStream]


class YamlDeclarativeSource(DeclarativeSource):
class YamlDeclarativeSource(ManifestDeclarativeSource):
"""Declarative source defined by a yaml file"""

VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}

def __init__(self, path_to_yaml):
"""
:param path_to_yaml: Path to the yaml file describing the source
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")
self._factory = DeclarativeComponentFactory()
self._path_to_yaml = path_to_yaml
self._source_config = self._read_and_parse_yaml_file(path_to_yaml)

self._validate_source()

# Stopgap to protect the top-level namespace until it's validated through the schema
unknown_fields = [key for key in self._source_config.keys() if key not in self.VALID_TOP_LEVEL_FIELDS]
if unknown_fields:
raise InvalidConnectorDefinitionException(f"Found unknown top-level fields: {unknown_fields}")

@property
def connection_checker(self) -> ConnectionChecker:
check = self._source_config["check"]
if "class_name" not in check:
check["class_name"] = "airbyte_cdk.sources.declarative.checks.check_stream.CheckStream"
return self._factory.create_component(check, dict())(source=self)

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self.logger.debug(
"parsed YAML into declarative source",
extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)},
)
source_streams = [self._factory.create_component(stream_config, config, True)() for stream_config in self._stream_configs()]
for stream in source_streams:
# make sure the log level is always appied to the stream's logger
self._apply_log_level_to_stream_logger(self.logger, stream)
return source_streams

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
in the project root.
"""

self.logger.debug(
"parsed YAML into declarative source",
extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)},
)

spec = self._source_config.get("spec")
if spec:
if "class_name" not in spec:
spec["class_name"] = "airbyte_cdk.sources.declarative.spec.Spec"
spec_component = self._factory.create_component(spec, dict())()
return spec_component.generate_spec()
else:
return super().spec(logger)
source_config = self._read_and_parse_yaml_file(path_to_yaml)
super().__init__(source_config)

def _read_and_parse_yaml_file(self, path_to_yaml_file):
def _read_and_parse_yaml_file(self, path_to_yaml_file) -> ConnectionDefinition:
package = self.__class__.__module__.split(".")[0]

yaml_config = pkgutil.get_data(package, path_to_yaml_file)
decoded_yaml = yaml_config.decode()
return YamlParser().parse(decoded_yaml)

def _validate_source(self):
full_config = {}
if "version" in self._source_config:
full_config["version"] = self._source_config["version"]
if "check" in self._source_config:
full_config["checker"] = self._source_config["check"]
streams = [self._factory.create_component(stream_config, {}, False)() for stream_config in self._stream_configs()]
if len(streams) > 0:
full_config["streams"] = streams
declarative_source_schema = ConcreteDeclarativeSource.json_schema()
validate(full_config, declarative_source_schema)

def _stream_configs(self):
stream_configs = self._source_config.get("streams", [])
for s in stream_configs:
if "class_name" not in s:
s["class_name"] = "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
return stream_configs

@staticmethod
def generate_schema() -> str:
expanded_source_definition = YamlDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
expanded_schema = expanded_source_definition.json_schema()
return json.dumps(expanded_schema, cls=SchemaEncoder)

@staticmethod
def expand_schema_interfaces(expand_class: type, visited: dict) -> type:
"""
Recursive function that takes in class type that will have its interface fields unpacked and expended and then recursively
attempt the same expansion on all the class' underlying fields that are declarative component. It also performs expansion
with respect to interfaces that are contained within generic data types.
:param expand_class: The declarative component class that will have its interface fields expanded
:param visited: cache used to store a record of already visited declarative classes that have already been seen
:return: The expanded declarative component
"""

# Recursive base case to stop recursion if we have already expanded an interface in case of cyclical components
# like CompositeErrorHandler
if expand_class.__name__ in visited:
return visited[expand_class.__name__]
visited[expand_class.__name__] = expand_class

next_classes = []
class_fields = fields(expand_class)
for field in class_fields:
unpacked_field_types = DeclarativeComponentFactory.unpack(field.type)
expand_class.__annotations__[field.name] = unpacked_field_types
next_classes.extend(YamlDeclarativeSource._get_next_expand_classes(field.type))
for next_class in next_classes:
YamlDeclarativeSource.expand_schema_interfaces(next_class, visited)
return expand_class

@staticmethod
def _get_next_expand_classes(field_type) -> list[type]:
"""
Parses through a given field type and assembles a list of all underlying declarative components. For a concrete declarative class
it will return itself. For a declarative interface it will return its subclasses. For declarative components in a generic type
it will return the unpacked classes. Any non-declarative types will be skipped.
:param field_type: A field type that
:return:
"""
generic_type = typing.get_origin(field_type)
if generic_type is None:
# We can only continue parsing declarative that inherit from the JsonSchemaMixin class because it is used
# to generate the final json schema
if inspect.isclass(field_type) and issubclass(field_type, JsonSchemaMixin) and not isinstance(field_type, EnumMeta):
subclasses = field_type.__subclasses__()
if subclasses:
return subclasses
else:
return [field_type]
elif generic_type == list or generic_type == Union:
next_classes = []
for underlying_type in typing.get_args(field_type):
next_classes.extend(YamlDeclarativeSource._get_next_expand_classes(underlying_type))
return next_classes
return []


class SchemaEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, property) or isinstance(obj, Enum):
return str(obj)
return json.JSONEncoder.default(self, obj)
def _emit_manifest_debug_message(self, extra_args: dict):
extra_args["path_to_yaml"] = self._path_to_yaml
self.logger.debug("declarative source created from parsed YAML manifest", extra=extra_args)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.7.0",
version="0.7.1",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Loading

0 comments on commit 560f4e8

Please sign in to comment.