Skip to content

Commit

Permalink
CDK: support nested refs resolving
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmytro Rezchykov committed Sep 15, 2021
1 parent 7a6da86 commit c15c93b
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 68 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
@@ -1,5 +1,8 @@
# Changelog

## 0.1.20
Resolve nested schema references and move external references to single schema definitions.

## 0.1.19
No longer prints full config files on validation error to prevent exposing secrets to log file: https://github.com/airbytehq/airbyte/pull/5879

Expand Down
124 changes: 60 additions & 64 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py
Expand Up @@ -23,77 +23,20 @@
#


import importlib
import json
import os
import pkgutil
from typing import Any, ClassVar, Dict, Mapping, Tuple

import pkg_resources
import jsonref
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import ConnectorSpecification
from jsonschema import RefResolver, validate
from jsonschema import validate
from jsonschema.exceptions import ValidationError
from pydantic import BaseModel, Field


class JsonSchemaResolver:
"""Helper class to expand $ref items in json schema"""

def __init__(self, shared_schemas_path: str):
self._shared_refs = self._load_shared_schema_refs(shared_schemas_path)

@staticmethod
def _load_shared_schema_refs(shared_schemas_path: str):
shared_file_names = [f.name for f in os.scandir(shared_schemas_path) if f.is_file()]
shared_schema_refs = {}
for shared_file in shared_file_names:
with open(os.path.join(shared_schemas_path, shared_file)) as data_file:
shared_schema_refs[shared_file] = json.load(data_file)

return shared_schema_refs

def _resolve_schema_references(self, schema: dict, resolver: RefResolver) -> dict:
if "$ref" in schema:
reference_path = schema.pop("$ref", None)
resolved = resolver.resolve(reference_path)[1]
schema.update(resolved)
return self._resolve_schema_references(schema, resolver)

if "properties" in schema:
for k, val in schema["properties"].items():
schema["properties"][k] = self._resolve_schema_references(val, resolver)

if "patternProperties" in schema:
for k, val in schema["patternProperties"].items():
schema["patternProperties"][k] = self._resolve_schema_references(val, resolver)

if "items" in schema:
schema["items"] = self._resolve_schema_references(schema["items"], resolver)

if "anyOf" in schema:
for i, element in enumerate(schema["anyOf"]):
schema["anyOf"][i] = self._resolve_schema_references(element, resolver)

return schema

def resolve(self, schema: dict, refs: Dict[str, dict] = None) -> dict:
"""Resolves and replaces json-schema $refs with the appropriate dict.
Recursively walks the given schema dict, converting every instance
of $ref in a 'properties' structure with a resolved dict.
This modifies the input schema and also returns it.
Arguments:
schema:
the schema dict
refs:
a dict of <string, dict> which forms a store of referenced schemata
Returns:
schema
"""
refs = refs or {}
refs = {**self._shared_refs, **refs}
return self._resolve_schema_references(schema, RefResolver("", schema, store=refs))


class ResourceSchemaLoader:
"""JSONSchema loader from package resources"""

Expand Down Expand Up @@ -124,10 +67,63 @@ def get_schema(self, name: str) -> dict:
print(f"Invalid JSON file format for file {schema_filename}")
raise

shared_schemas_folder = pkg_resources.resource_filename(self.package_name, "schemas/shared/")
if os.path.exists(shared_schemas_folder):
return JsonSchemaResolver(shared_schemas_folder).resolve(raw_schema)
return raw_schema
return self.__resolve_schema_references(raw_schema)

def __resolve_schema_references(self, raw_schema: dict) -> dict:
"""
Resolve links to external references and move it to local "definitions" map.
:param raw_schema jsonschema to lookup for external links.
:return JSON serializable object with references without external dependencies.
"""

class JsonFileLoader:
"""
Custom json file loader to resolve references to resources located in "shared" directory.
We need this for compatability with existing schemas cause all of them have references
pointing to shared_schema.json file instead of shared/shared_schema.json
"""

def __init__(self, uri_base: str, shared: str):
self.shared = shared
self.uri_base = uri_base

def __call__(self, uri: str) -> Dict[str, Any]:
uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/")
return json.load(open(uri))

package = importlib.import_module(self.package_name)
base = os.path.dirname(package.__file__) + "/"

def create_definitions(obj: dict, definitions: dict) -> Dict[str, Any]:
"""
Scan resolved schema and compose definitions section, also convert
jsonref.JsonRef object to JSON serializable dict.
:param obj - jsonschema object with ref field resovled.
:definitions - object for storing generated definitions.
:return JSON serializable object with references without external dependencies.
"""
if isinstance(obj, jsonref.JsonRef):
def_key = obj.__reference__["$ref"]
def_key = def_key.replace("#/definitions/", "").replace(".json", "_")
definition = create_definitions(obj.__subject__, definitions)
# Omit existance definitions for extenal resource since
# we dont need it anymore.
definition.pop("definitions", None)
definitions[def_key] = definition
return {"$ref": "#/definitions/" + def_key}
elif isinstance(obj, dict):
return {k: create_definitions(v, definitions) for k, v in obj.items()}
elif isinstance(obj, list):
return [create_definitions(item, definitions) for item in obj]
else:
return obj

resolved = jsonref.JsonRef.replace_refs(raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base)
definitions = {}
resolved = create_definitions(resolved, definitions)
if definitions:
resolved["definitions"] = definitions
return resolved


def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger: AirbyteLogger):
Expand Down
3 changes: 2 additions & 1 deletion airbyte-cdk/python/setup.py
Expand Up @@ -35,7 +35,7 @@

setup(
name="airbyte-cdk",
version="0.1.18",
version="0.1.20",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down Expand Up @@ -67,6 +67,7 @@
install_requires=[
"backoff",
"jsonschema~=3.2.0",
"jsonref~=0.2",
"pendulum",
"pydantic~=1.6",
"PyYAML~=5.4",
Expand Down
46 changes: 43 additions & 3 deletions airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py
Expand Up @@ -31,6 +31,7 @@
from collections.abc import Mapping
from pathlib import Path

import jsonref
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader, check_config_against_spec_or_exit
Expand All @@ -45,8 +46,6 @@
SCHEMAS_ROOT = "/".join(os.path.abspath(MODULE.__file__).split("/")[:-1]) / Path("schemas")


# TODO (sherif) refactor ResourceSchemaLoader to completely separate the functionality for reading data from the package. See https://github.com/airbytehq/airbyte/issues/3222
# and the functionality for resolving schemas. See https://github.com/airbytehq/airbyte/issues/3222
@fixture(autouse=True, scope="session")
def create_and_teardown_schemas_dir():
os.mkdir(SCHEMAS_ROOT)
Expand Down Expand Up @@ -117,8 +116,9 @@ def test_shared_schemas_resolves():
"properties": {
"str": {"type": "string"},
"int": {"type": "integer"},
"obj": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}},
"obj": {"$ref": "#/definitions/shared_schema_"},
},
"definitions": {"shared_schema_": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}},
}

partial_schema = {
Expand All @@ -135,3 +135,43 @@ def test_shared_schemas_resolves():

actual_schema = resolver.get_schema("complex_schema")
assert actual_schema == expected_schema

@staticmethod
def test_shared_schemas_resolves_nested():
expected_schema = {
"type": ["null", "object"],
"properties": {
"str": {"type": "string"},
"int": {"type": "integer"},
"one_of": {"oneOf": [{"type": "string"}, {"$ref": "#/definitions/shared_schema_type_one"}]},
"obj": {"$ref": "#/definitions/shared_schema_type_one"},
},
"definitions": {"shared_schema_type_one": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}},
}
partial_schema = {
"type": ["null", "object"],
"properties": {
"str": {"type": "string"},
"int": {"type": "integer"},
"one_of": {"oneOf": [{"type": "string"}, {"$ref": "shared_schema.json#/definitions/type_one"}]},
"obj": {"$ref": "shared_schema.json#/definitions/type_one"},
},
}

referenced_schema = {
"definitions": {
"type_one": {"$ref": "shared_schema.json#/definitions/type_nested"},
"type_nested": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}},
}
}

create_schema("complex_schema", partial_schema)
create_schema("shared/shared_schema", referenced_schema)

resolver = ResourceSchemaLoader(MODULE_NAME)

actual_schema = resolver.get_schema("complex_schema")
assert actual_schema == expected_schema
# Make sure generated schema is JSON serializable
assert json.dumps(actual_schema)
assert jsonref.JsonRef.replace_refs(actual_schema)

0 comments on commit c15c93b

Please sign in to comment.