Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🏗️ CDK: support nested refs resolving #6044

Merged
merged 1 commit into from Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
@@ -1,5 +1,8 @@
# Changelog

## 0.1.21
Resolve nested schema references and move external references to single schema definitions.

## 0.1.20
- Allow using `requests.auth.AuthBase` as authenticators instead of custom CDK authenticators.
- Implement Oauth2Authenticator, MultipleTokenAuthenticator and TokenAuthenticator authenticators.
Expand Down
124 changes: 60 additions & 64 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py
Expand Up @@ -23,77 +23,20 @@
#


import importlib
import json
import os
import pkgutil
from typing import Any, ClassVar, Dict, Mapping, Tuple

import pkg_resources
import jsonref
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import ConnectorSpecification
from jsonschema import RefResolver, validate
from jsonschema import validate
from jsonschema.exceptions import ValidationError
from pydantic import BaseModel, Field


class JsonSchemaResolver:
"""Helper class to expand $ref items in json schema"""

def __init__(self, shared_schemas_path: str):
self._shared_refs = self._load_shared_schema_refs(shared_schemas_path)

@staticmethod
def _load_shared_schema_refs(shared_schemas_path: str):
shared_file_names = [f.name for f in os.scandir(shared_schemas_path) if f.is_file()]
shared_schema_refs = {}
for shared_file in shared_file_names:
with open(os.path.join(shared_schemas_path, shared_file)) as data_file:
shared_schema_refs[shared_file] = json.load(data_file)

return shared_schema_refs

def _resolve_schema_references(self, schema: dict, resolver: RefResolver) -> dict:
if "$ref" in schema:
reference_path = schema.pop("$ref", None)
resolved = resolver.resolve(reference_path)[1]
schema.update(resolved)
return self._resolve_schema_references(schema, resolver)

if "properties" in schema:
for k, val in schema["properties"].items():
schema["properties"][k] = self._resolve_schema_references(val, resolver)

if "patternProperties" in schema:
for k, val in schema["patternProperties"].items():
schema["patternProperties"][k] = self._resolve_schema_references(val, resolver)

if "items" in schema:
schema["items"] = self._resolve_schema_references(schema["items"], resolver)

if "anyOf" in schema:
for i, element in enumerate(schema["anyOf"]):
schema["anyOf"][i] = self._resolve_schema_references(element, resolver)

return schema

def resolve(self, schema: dict, refs: Dict[str, dict] = None) -> dict:
"""Resolves and replaces json-schema $refs with the appropriate dict.
Recursively walks the given schema dict, converting every instance
of $ref in a 'properties' structure with a resolved dict.
This modifies the input schema and also returns it.
Arguments:
schema:
the schema dict
refs:
a dict of <string, dict> which forms a store of referenced schemata
Returns:
schema
"""
refs = refs or {}
refs = {**self._shared_refs, **refs}
return self._resolve_schema_references(schema, RefResolver("", schema, store=refs))


class ResourceSchemaLoader:
"""JSONSchema loader from package resources"""

Expand Down Expand Up @@ -124,10 +67,63 @@ def get_schema(self, name: str) -> dict:
print(f"Invalid JSON file format for file {schema_filename}")
raise

shared_schemas_folder = pkg_resources.resource_filename(self.package_name, "schemas/shared/")
if os.path.exists(shared_schemas_folder):
return JsonSchemaResolver(shared_schemas_folder).resolve(raw_schema)
return raw_schema
return self.__resolve_schema_references(raw_schema)

def __resolve_schema_references(self, raw_schema: dict) -> dict:
"""
Resolve links to external references and move it to local "definitions" map.
:param raw_schema jsonschema to lookup for external links.
:return JSON serializable object with references without external dependencies.
"""

class JsonFileLoader:
"""
Custom json file loader to resolve references to resources located in "shared" directory.
We need this for compatability with existing schemas cause all of them have references
pointing to shared_schema.json file instead of shared/shared_schema.json
"""

def __init__(self, uri_base: str, shared: str):
self.shared = shared
self.uri_base = uri_base

def __call__(self, uri: str) -> Dict[str, Any]:
uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/")
return json.load(open(uri))

package = importlib.import_module(self.package_name)
base = os.path.dirname(package.__file__) + "/"

def create_definitions(obj: dict, definitions: dict) -> Dict[str, Any]:
"""
Scan resolved schema and compose definitions section, also convert
jsonref.JsonRef object to JSON serializable dict.
:param obj - jsonschema object with ref field resovled.
:definitions - object for storing generated definitions.
:return JSON serializable object with references without external dependencies.
"""
if isinstance(obj, jsonref.JsonRef):
def_key = obj.__reference__["$ref"]
def_key = def_key.replace("#/definitions/", "").replace(".json", "_")
definition = create_definitions(obj.__subject__, definitions)
# Omit existance definitions for extenal resource since
# we dont need it anymore.
definition.pop("definitions", None)
definitions[def_key] = definition
return {"$ref": "#/definitions/" + def_key}
elif isinstance(obj, dict):
return {k: create_definitions(v, definitions) for k, v in obj.items()}
elif isinstance(obj, list):
return [create_definitions(item, definitions) for item in obj]
else:
return obj

resolved = jsonref.JsonRef.replace_refs(raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base)
definitions = {}
resolved = create_definitions(resolved, definitions)
if definitions:
resolved["definitions"] = definitions
return resolved


def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger: AirbyteLogger):
Expand Down
3 changes: 2 additions & 1 deletion airbyte-cdk/python/setup.py
Expand Up @@ -35,7 +35,7 @@

setup(
name="airbyte-cdk",
version="0.1.20",
version="0.1.21",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -67,6 +67,7 @@
install_requires=[
"backoff",
"jsonschema~=3.2.0",
"jsonref~=0.2",
"pendulum",
"pydantic~=1.6",
"PyYAML~=5.4",
Expand Down
Expand Up @@ -31,6 +31,7 @@
from collections.abc import Mapping
from pathlib import Path

import jsonref
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader, check_config_against_spec_or_exit
Expand All @@ -45,8 +46,6 @@
SCHEMAS_ROOT = "/".join(os.path.abspath(MODULE.__file__).split("/")[:-1]) / Path("schemas")


# TODO (sherif) refactor ResourceSchemaLoader to completely separate the functionality for reading data from the package. See https://github.com/airbytehq/airbyte/issues/3222
# and the functionality for resolving schemas. See https://github.com/airbytehq/airbyte/issues/3222
@fixture(autouse=True, scope="session")
def create_and_teardown_schemas_dir():
os.mkdir(SCHEMAS_ROOT)
Expand Down Expand Up @@ -117,8 +116,9 @@ def test_shared_schemas_resolves():
"properties": {
"str": {"type": "string"},
"int": {"type": "integer"},
"obj": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}},
"obj": {"$ref": "#/definitions/shared_schema_"},
},
"definitions": {"shared_schema_": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}},
}

partial_schema = {
Expand All @@ -135,3 +135,43 @@ def test_shared_schemas_resolves():

actual_schema = resolver.get_schema("complex_schema")
assert actual_schema == expected_schema

@staticmethod
def test_shared_schemas_resolves_nested():
expected_schema = {
"type": ["null", "object"],
"properties": {
"str": {"type": "string"},
"int": {"type": "integer"},
"one_of": {"oneOf": [{"type": "string"}, {"$ref": "#/definitions/shared_schema_type_one"}]},
"obj": {"$ref": "#/definitions/shared_schema_type_one"},
},
"definitions": {"shared_schema_type_one": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}},
}
partial_schema = {
"type": ["null", "object"],
"properties": {
"str": {"type": "string"},
"int": {"type": "integer"},
"one_of": {"oneOf": [{"type": "string"}, {"$ref": "shared_schema.json#/definitions/type_one"}]},
"obj": {"$ref": "shared_schema.json#/definitions/type_one"},
},
}

referenced_schema = {
"definitions": {
"type_one": {"$ref": "shared_schema.json#/definitions/type_nested"},
"type_nested": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}},
}
}

create_schema("complex_schema", partial_schema)
create_schema("shared/shared_schema", referenced_schema)

resolver = ResourceSchemaLoader(MODULE_NAME)

actual_schema = resolver.get_schema("complex_schema")
assert actual_schema == expected_schema
# Make sure generated schema is JSON serializable
assert json.dumps(actual_schema)
assert jsonref.JsonRef.replace_refs(actual_schema)