Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Framework v0.1 #2634

Merged
merged 5 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions airbyte-integrations/bases/base-python/base_python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,35 @@
SOFTWARE.
"""

from .catalog_helpers import CatalogHelper
from .client import BaseClient
from .integration import AirbyteSpec, Destination, Integration, Source
from .logger import AirbyteLogger
from .source import BaseSource
from base_python.catalog_helpers import CatalogHelper
from base_python.client import BaseClient
from base_python.integration import AirbyteSpec, Destination, Integration, Source
from base_python.logger import AirbyteLogger
from base_python.sdk.abstract_source import AbstractSource

# Separate the SDK imports so they can be moved somewhere else more easily
from base_python.sdk.streams.auth.core import HttpAuthenticator
from base_python.sdk.streams.auth.token import TokenAuthenticator
from base_python.sdk.streams.core import Stream
from base_python.sdk.streams.http import HttpStream
from base_python.source import BaseSource

# Must be the last one because the way we load the connector module creates a circular
# dependency and models might not have been loaded yet
from .entrypoint import AirbyteEntrypoint # noqa isort:skip
from base_python.entrypoint import AirbyteEntrypoint # noqa isort:skip

__all__ = [
"AirbyteLogger",
"AirbyteSpec",
"AbstractSource",
"BaseClient",
"BaseSource",
"CatalogHelper",
"Destination",
"HttpAuthenticator",
"HttpStream",
"Integration",
"Source",
"Stream",
"TokenAuthenticator",
]
80 changes: 2 additions & 78 deletions airbyte-integrations/bases/base-python/base_python/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,12 @@
"""

import inspect
import json
import os
import pkgutil
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Generator, List, Mapping, Tuple

import pkg_resources
from airbyte_protocol import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, SyncMode
from jsonschema import RefResolver

from .schema_helpers import ResourceSchemaLoader


def package_name_from_class(cls: object) -> str:
Expand All @@ -40,79 +37,6 @@ def package_name_from_class(cls: object) -> str:
return module.__name__.split(".")[0]


class JsonSchemaResolver:
"""Helper class to expand $ref items in json schema"""

def __init__(self, shared_schemas_path: str):
self._shared_refs = self._load_shared_schema_refs(shared_schemas_path)

@staticmethod
def _load_shared_schema_refs(path: str):
shared_file_names = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]

shared_schema_refs = {}
for shared_file in shared_file_names:
with open(os.path.join(path, shared_file)) as data_file:
shared_schema_refs[shared_file] = json.load(data_file)

return shared_schema_refs

def _resolve_schema_references(self, schema: dict, resolver: RefResolver) -> dict:
if "$ref" in schema:
reference_path = schema.pop("$ref", None)
resolved = resolver.resolve(reference_path)[1]
schema.update(resolved)
return self._resolve_schema_references(schema, resolver)

if "properties" in schema:
for k, val in schema["properties"].items():
schema["properties"][k] = self._resolve_schema_references(val, resolver)

if "patternProperties" in schema:
for k, val in schema["patternProperties"].items():
schema["patternProperties"][k] = self._resolve_schema_references(val, resolver)

if "items" in schema:
schema["items"] = self._resolve_schema_references(schema["items"], resolver)

if "anyOf" in schema:
for i, element in enumerate(schema["anyOf"]):
schema["anyOf"][i] = self._resolve_schema_references(element, resolver)

return schema

def resolve(self, schema: dict, refs: Dict[str, dict] = None) -> dict:
"""Resolves and replaces json-schema $refs with the appropriate dict.
Recursively walks the given schema dict, converting every instance
of $ref in a 'properties' structure with a resolved dict.
This modifies the input schema and also returns it.
Arguments:
schema:
the schema dict
refs:
a dict of <string, dict> which forms a store of referenced schemata
Returns:
schema
"""
refs = refs or {}
refs = {**self._shared_refs, **refs}
return self._resolve_schema_references(schema, RefResolver("", schema, store=refs))


class ResourceSchemaLoader:
"""JSONSchema loader from package resources"""

def __init__(self, package_name: str):
self.package_name = package_name

def get_schema(self, name: str) -> dict:
raw_schema = json.loads(pkgutil.get_data(self.package_name, f"schemas/{name}.json"))
shared_schemas_folder = pkg_resources.resource_filename(self.package_name, "schemas/shared/")
if os.path.exists(shared_schemas_folder):
return JsonSchemaResolver(shared_schemas_folder).resolve(raw_schema)
return raw_schema


class StreamStateMixin:
def get_stream_state(self, name: str) -> Any:
"""Get state of stream with corresponding name"""
Expand Down
126 changes: 126 additions & 0 deletions airbyte-integrations/bases/base-python/base_python/schema_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""
MIT License

Copyright (c) 2020 Airbyte

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

import json
import os
import pkgutil
from typing import Dict

import pkg_resources
from jsonschema import RefResolver


class JsonSchemaResolver:
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
"""Helper class to expand $ref items in json schema"""

def __init__(self, shared_schemas_path: str):
self._shared_refs = self._load_shared_schema_refs(shared_schemas_path)

@staticmethod
def _load_shared_schema_refs(shared_schemas_path: str):
shared_file_names = [f for f in os.scandir(shared_schemas_path) if f.isfile()]

shared_schema_refs = {}
for shared_file in shared_file_names:
with open(os.path.join(shared_schemas_path, shared_file)) as data_file:
shared_schema_refs[shared_file] = json.load(data_file)

return shared_schema_refs

def _resolve_schema_references(self, schema: dict, resolver: RefResolver) -> dict:
if "$ref" in schema:
reference_path = schema.pop("$ref", None)
resolved = resolver.resolve(reference_path)[1]
schema.update(resolved)
return self._resolve_schema_references(schema, resolver)

if "properties" in schema:
for k, val in schema["properties"].items():
schema["properties"][k] = self._resolve_schema_references(val, resolver)

if "patternProperties" in schema:
for k, val in schema["patternProperties"].items():
schema["patternProperties"][k] = self._resolve_schema_references(val, resolver)

if "items" in schema:
schema["items"] = self._resolve_schema_references(schema["items"], resolver)

if "anyOf" in schema:
for i, element in enumerate(schema["anyOf"]):
schema["anyOf"][i] = self._resolve_schema_references(element, resolver)

return schema

def resolve(self, schema: dict, refs: Dict[str, dict] = None) -> dict:
"""Resolves and replaces json-schema $refs with the appropriate dict.
Recursively walks the given schema dict, converting every instance
of $ref in a 'properties' structure with a resolved dict.
This modifies the input schema and also returns it.
Arguments:
schema:
the schema dict
refs:
a dict of <string, dict> which forms a store of referenced schemata
Returns:
schema
"""
refs = refs or {}
refs = {**self._shared_refs, **refs}
return self._resolve_schema_references(schema, RefResolver("", schema, store=refs))


class ResourceSchemaLoader:
"""JSONSchema loader from package resources"""

def __init__(self, package_name: str):
self.package_name = package_name

def get_schema(self, name: str) -> dict:
"""
This method retrieves a JSON schema from the schemas/ folder.


The expected file structure is to have all top-level schemas (corresponding to streams) in the "schemas/" folder, with any shared $refs
living inside the "schemas/shared/" folder. For example:

schemas/shared/<shared_definition>.json
schemas/<name>.json # contains a $ref to shared_definition
schemas/<name2>.json # contains a $ref to shared_definition
"""

schema_filename = f"schemas/{name}.json"
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
raw_file = pkgutil.get_data(self.package_name, schema_filename)
if not raw_file:
raise IOError(f"Cannot find file {schema_filename}")
try:
raw_schema = json.loads(raw_file)
except ValueError:
# TODO use proper logging
print(f"Invalid JSON file format for file {schema_filename}")
raise

shared_schemas_folder = pkg_resources.resource_filename(self.package_name, "schemas/shared/")
if os.path.exists(shared_schemas_folder):
return JsonSchemaResolver(shared_schemas_folder).resolve(raw_schema)
return raw_schema
Loading