diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 7191bad5e0908..b25d00e39b238 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.21 +Resolve nested schema references and move external references to single schema definitions. + ## 0.1.20 - Allow using `requests.auth.AuthBase` as authenticators instead of custom CDK authenticators. - Implement Oauth2Authenticator, MultipleTokenAuthenticator and TokenAuthenticator authenticators. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py index 2cc15730342a7..496d416b5b520 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py @@ -23,77 +23,20 @@ # +import importlib import json import os import pkgutil from typing import Any, ClassVar, Dict, Mapping, Tuple -import pkg_resources +import jsonref from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ConnectorSpecification -from jsonschema import RefResolver, validate +from jsonschema import validate from jsonschema.exceptions import ValidationError from pydantic import BaseModel, Field -class JsonSchemaResolver: - """Helper class to expand $ref items in json schema""" - - def __init__(self, shared_schemas_path: str): - self._shared_refs = self._load_shared_schema_refs(shared_schemas_path) - - @staticmethod - def _load_shared_schema_refs(shared_schemas_path: str): - shared_file_names = [f.name for f in os.scandir(shared_schemas_path) if f.is_file()] - shared_schema_refs = {} - for shared_file in shared_file_names: - with open(os.path.join(shared_schemas_path, shared_file)) as data_file: - shared_schema_refs[shared_file] = json.load(data_file) - - return shared_schema_refs - - def _resolve_schema_references(self, schema: dict, resolver: RefResolver) -> dict: - if "$ref" in schema: - reference_path = schema.pop("$ref", None) - resolved = resolver.resolve(reference_path)[1] - schema.update(resolved) - return self._resolve_schema_references(schema, resolver) - - if "properties" in schema: - for k, val in schema["properties"].items(): - schema["properties"][k] = self._resolve_schema_references(val, resolver) - - if "patternProperties" in schema: - for k, val in schema["patternProperties"].items(): - schema["patternProperties"][k] = self._resolve_schema_references(val, resolver) - - if "items" in schema: - schema["items"] = self._resolve_schema_references(schema["items"], resolver) - - if "anyOf" in schema: - for i, element in enumerate(schema["anyOf"]): - schema["anyOf"][i] = self._resolve_schema_references(element, resolver) - - return schema - - def resolve(self, schema: dict, refs: Dict[str, dict] = None) -> dict: - """Resolves and replaces json-schema $refs with the appropriate dict. - Recursively walks the given schema dict, converting every instance - of $ref in a 'properties' structure with a resolved dict. - This modifies the input schema and also returns it. - Arguments: - schema: - the schema dict - refs: - a dict of which forms a store of referenced schemata - Returns: - schema - """ - refs = refs or {} - refs = {**self._shared_refs, **refs} - return self._resolve_schema_references(schema, RefResolver("", schema, store=refs)) - - class ResourceSchemaLoader: """JSONSchema loader from package resources""" @@ -124,10 +67,63 @@ def get_schema(self, name: str) -> dict: print(f"Invalid JSON file format for file {schema_filename}") raise - shared_schemas_folder = pkg_resources.resource_filename(self.package_name, "schemas/shared/") - if os.path.exists(shared_schemas_folder): - return JsonSchemaResolver(shared_schemas_folder).resolve(raw_schema) - return raw_schema + return self.__resolve_schema_references(raw_schema) + + def __resolve_schema_references(self, raw_schema: dict) -> dict: + """ + Resolve links to external references and move it to local "definitions" map. + :param raw_schema jsonschema to lookup for external links. + :return JSON serializable object with references without external dependencies. + """ + + class JsonFileLoader: + """ + Custom json file loader to resolve references to resources located in "shared" directory. + We need this for compatability with existing schemas cause all of them have references + pointing to shared_schema.json file instead of shared/shared_schema.json + """ + + def __init__(self, uri_base: str, shared: str): + self.shared = shared + self.uri_base = uri_base + + def __call__(self, uri: str) -> Dict[str, Any]: + uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/") + return json.load(open(uri)) + + package = importlib.import_module(self.package_name) + base = os.path.dirname(package.__file__) + "/" + + def create_definitions(obj: dict, definitions: dict) -> Dict[str, Any]: + """ + Scan resolved schema and compose definitions section, also convert + jsonref.JsonRef object to JSON serializable dict. + :param obj - jsonschema object with ref field resovled. + :definitions - object for storing generated definitions. + :return JSON serializable object with references without external dependencies. + """ + if isinstance(obj, jsonref.JsonRef): + def_key = obj.__reference__["$ref"] + def_key = def_key.replace("#/definitions/", "").replace(".json", "_") + definition = create_definitions(obj.__subject__, definitions) + # Omit existance definitions for extenal resource since + # we dont need it anymore. + definition.pop("definitions", None) + definitions[def_key] = definition + return {"$ref": "#/definitions/" + def_key} + elif isinstance(obj, dict): + return {k: create_definitions(v, definitions) for k, v in obj.items()} + elif isinstance(obj, list): + return [create_definitions(item, definitions) for item in obj] + else: + return obj + + resolved = jsonref.JsonRef.replace_refs(raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base) + definitions = {} + resolved = create_definitions(resolved, definitions) + if definitions: + resolved["definitions"] = definitions + return resolved def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger: AirbyteLogger): diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 3488ef6928528..8be2e3ce70e6d 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -35,7 +35,7 @@ setup( name="airbyte-cdk", - version="0.1.20", + version="0.1.21", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", @@ -67,6 +67,7 @@ install_requires=[ "backoff", "jsonschema~=3.2.0", + "jsonref~=0.2", "pendulum", "pydantic~=1.6", "PyYAML~=5.4", diff --git a/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py b/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py index de8d8b1613083..b4713c200ed98 100644 --- a/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py +++ b/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py @@ -31,6 +31,7 @@ from collections.abc import Mapping from pathlib import Path +import jsonref from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader, check_config_against_spec_or_exit @@ -45,8 +46,6 @@ SCHEMAS_ROOT = "/".join(os.path.abspath(MODULE.__file__).split("/")[:-1]) / Path("schemas") -# TODO (sherif) refactor ResourceSchemaLoader to completely separate the functionality for reading data from the package. See https://github.com/airbytehq/airbyte/issues/3222 -# and the functionality for resolving schemas. See https://github.com/airbytehq/airbyte/issues/3222 @fixture(autouse=True, scope="session") def create_and_teardown_schemas_dir(): os.mkdir(SCHEMAS_ROOT) @@ -117,8 +116,9 @@ def test_shared_schemas_resolves(): "properties": { "str": {"type": "string"}, "int": {"type": "integer"}, - "obj": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}, + "obj": {"$ref": "#/definitions/shared_schema_"}, }, + "definitions": {"shared_schema_": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}}, } partial_schema = { @@ -135,3 +135,43 @@ def test_shared_schemas_resolves(): actual_schema = resolver.get_schema("complex_schema") assert actual_schema == expected_schema + + @staticmethod + def test_shared_schemas_resolves_nested(): + expected_schema = { + "type": ["null", "object"], + "properties": { + "str": {"type": "string"}, + "int": {"type": "integer"}, + "one_of": {"oneOf": [{"type": "string"}, {"$ref": "#/definitions/shared_schema_type_one"}]}, + "obj": {"$ref": "#/definitions/shared_schema_type_one"}, + }, + "definitions": {"shared_schema_type_one": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}}, + } + partial_schema = { + "type": ["null", "object"], + "properties": { + "str": {"type": "string"}, + "int": {"type": "integer"}, + "one_of": {"oneOf": [{"type": "string"}, {"$ref": "shared_schema.json#/definitions/type_one"}]}, + "obj": {"$ref": "shared_schema.json#/definitions/type_one"}, + }, + } + + referenced_schema = { + "definitions": { + "type_one": {"$ref": "shared_schema.json#/definitions/type_nested"}, + "type_nested": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}, + } + } + + create_schema("complex_schema", partial_schema) + create_schema("shared/shared_schema", referenced_schema) + + resolver = ResourceSchemaLoader(MODULE_NAME) + + actual_schema = resolver.get_schema("complex_schema") + assert actual_schema == expected_schema + # Make sure generated schema is JSON serializable + assert json.dumps(actual_schema) + assert jsonref.JsonRef.replace_refs(actual_schema)