Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[low-code cdk] Allow for spec file to be defined in the yaml manifest instead of an external file #18411

Merged
merged 12 commits into from
Nov 7, 2022
Merged
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.7.0
Low-code: Allow connector specifications to be defined in the manifest

## 0.6.0
Low-code: Add support for monthly and yearly incremental updates for `DatetimeStreamSlicer`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
Expand Down Expand Up @@ -75,6 +76,7 @@
"RemoveFields": RemoveFields,
"SimpleRetriever": SimpleRetriever,
"SingleSlice": SingleSlice,
"Spec": Spec,
"SubstreamSlicer": SubstreamSlicer,
"WaitUntilTimeFromHeader": WaitUntilTimeFromHeaderBackoffStrategy,
"WaitTimeFromHeader": WaitTimeFromHeaderBackoffStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,11 @@ def is_object_definition_with_class_name(definition):

@staticmethod
def is_object_definition_with_type(definition):
return isinstance(definition, dict) and "type" in definition
# The `type` field is an overloaded term in the context of the low-code manifest. As part of the language, `type` is shorthand
# for convenience to avoid defining the entire classpath. For the connector specification, `type` is a part of the spec schema.
# For spec parsing, as part of this check, when the type is set to object, we want it to remain a mapping. But when type is
# defined any other way, then it should be parsed as a declarative component in the manifest.
return isinstance(definition, dict) and "type" in definition and definition["type"] != "object"
Copy link
Contributor Author

@brianjlai brianjlai Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes me slightly nervous, but the connection_specification is effectively just a mapping/dict/object so when we are generating the spec component, it gets read in as an object and type: object gets added the component. Then, when because it has a type we attempt to resolve it in the CLASS_TYPES_REGISTRY which will correctly fail because type isn't in that map.

I'm not sure why we never saw this before, but I spot checked some components and didn't see any fields that were of mapping type so that might be why.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting. I assume the request options provider's field work because they're RequestInput, not raw Mapping?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name of the method is_object... should this check be !=?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with how are connector specifications are defined, the type field is an overloaded term. In the manifest, type is just shorthand so we don't have to include the full classpath. But for the connector specification object, type: object is actually part of the spec schema. So the factory gets confused and thinks we need to parse the spec as another declarative component, but we really rant this to remain a Mapping. Without this line, it looks for object in the CLASS_TYPES_REGISTRY

It's a little bit of a hack, but it allows us to make the manifest defined and external defined specs interchangable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brianjlai should we rename our type field to something less overloaded as part of lowcode to beta?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add this context as a comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup will add!


@staticmethod
def get_default_type(parameter_name, parent_class):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.spec.spec import Spec

__all__ = ["Spec"]
34 changes: 34 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/sources/declarative/spec/spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Mapping

from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class Spec(JsonSchemaMixin):
"""
Returns a connection specification made up of information about the connector and how it can be configured

Attributes:
documentation_url (str): The link the Airbyte documentation about this connector
connection_specification (Mapping[str, Any]): information related to how a connector can be configured
"""

documentation_url: str
connection_specification: Mapping[str, Any]
options: InitVar[Mapping[str, Any]]

def generate_spec(self) -> ConnectorSpecification:
"""
Returns the connector specification according the spec block defined in the low code connector manifest.
"""

# We remap these keys to camel case because that's the existing format expected by the rest of the platform
return ConnectorSpecification.parse_obj(
{"documentationUrl": self.documentation_url, "connectionSpecification": self.connection_specification}
Comment on lines +32 to +33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brianjlai how this will affect existing low-code connectors using the previous format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not, in the event that a spec is not defined in the manifest itself, it will default to using the original way of retrieving and generating the spec from the spec.yaml or spec.json file.

)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import datetime
import re
from dataclasses import InitVar, dataclass, field
from dateutil.relativedelta import relativedelta
from typing import Any, Iterable, Mapping, Optional, Union

from airbyte_cdk.models import SyncMode
Expand All @@ -17,6 +16,7 @@
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
from dateutil.relativedelta import relativedelta


@dataclass
Expand Down Expand Up @@ -71,7 +71,9 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
stream_state_field_end: Optional[str] = None
lookback_window: Optional[Union[InterpolatedString, str]] = None

timedelta_regex = re.compile(r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$")
timedelta_regex = re.compile(
r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$"
)

def __post_init__(self, options: Mapping[str, Any]):
if not isinstance(self.start_datetime, MinMaxDatetime):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from enum import Enum, EnumMeta
from typing import Any, List, Mapping, Union

from airbyte_cdk.models import ConnectorSpecification
from airbyte_cdk.sources.declarative.checks import CheckStream
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
Expand All @@ -33,7 +34,7 @@ class ConcreteDeclarativeSource(JsonSchemaMixin):
class YamlDeclarativeSource(DeclarativeSource):
"""Declarative source defined by a yaml file"""

VALID_TOP_LEVEL_FIELDS = {"definitions", "streams", "check", "version"}
VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}

def __init__(self, path_to_yaml):
"""
Expand Down Expand Up @@ -69,6 +70,28 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
self._apply_log_level_to_stream_logger(self.logger, stream)
return source_streams

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
"""
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
in the project root.
"""

self.logger.debug(
"parsed YAML into declarative source",
extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)},
)

spec = self._source_config.get("spec")
if spec:
if "class_name" not in spec:
spec["class_name"] = "airbyte_cdk.sources.declarative.spec.Spec"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the developer uses a type field instead of class_name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All three formats should work. If they define the class_name then we'll just use that. We define Spec in the class_types_registry so it should work with type, and if it's omitted here then we'll insert class name here.

spec_component = self._factory.create_component(spec, dict())()
return spec_component.generate_spec()
else:
return super().spec(logger)

def _read_and_parse_yaml_file(self, path_to_yaml_file):
package = self.__class__.__module__.split(".")[0]

Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.6.0",
version="0.7.0",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#

import datetime
from dateutil.relativedelta import relativedelta
from typing import List, Optional, Union

import pytest
Expand Down Expand Up @@ -41,6 +40,7 @@
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from dateutil.relativedelta import relativedelta
from jsonschema import ValidationError

factory = DeclarativeComponentFactory()
Expand Down Expand Up @@ -345,6 +345,22 @@ def test_full_config():
check:
class_name: airbyte_cdk.sources.declarative.checks.check_stream.CheckStream
stream_names: ["list_stream"]
spec:
class_name: airbyte_cdk.sources.declarative.spec.Spec
documentation_url: https://airbyte.com/#yaml-from-manifest
connection_specification:
title: Test Spec
type: object
required:
- api_key
additionalProperties: false
properties:
api_key:
type: string
airbyte_secret: true
title: API Key
description: Test API Key
order: 0
"""
config = parser.parse(content)

Expand Down Expand Up @@ -377,6 +393,20 @@ def test_full_config():
assert len(streams_to_check) == 1
assert list(streams_to_check)[0] == "list_stream"

spec = factory.create_component(config["spec"], input_config)()
documentation_url = spec.documentation_url
connection_specification = spec.connection_specification
assert documentation_url == "https://airbyte.com/#yaml-from-manifest"
assert connection_specification["title"] == "Test Spec"
assert connection_specification["required"] == ["api_key"]
assert connection_specification["properties"]["api_key"] == {
"type": "string",
"airbyte_secret": True,
"title": "API Key",
"description": "Test API Key",
"order": 0,
}

assert stream.retriever.requester.path.default == "marketing/lists"


Expand Down
Loading