-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AirbyteLib: More robust error handling, installation improvements #34572
Changes from 26 commits
abbb256
3845f5c
9fccace
a217a6e
6aa85d6
dddbc78
b1d966b
f61152a
d665088
809918b
4a41ffb
bab5e06
10ce077
063bba3
ab75be4
8880b0b
3773149
90918c8
9197728
f73f288
dd9ac99
a2bed01
2e49154
f975282
ace7208
8775c1b
f24226d
7370d83
cfafccc
8446838
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,8 @@ | |
|
||
import json | ||
import os | ||
import threading | ||
from copy import copy | ||
from dataclasses import dataclass | ||
from pathlib import Path | ||
|
||
|
@@ -12,47 +14,81 @@ | |
from airbyte_lib.version import get_version | ||
|
||
|
||
__cache: dict[str, ConnectorMetadata] | None = None | ||
_cache_lock = threading.Lock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like this lock is never actually aquired? Am I missing something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. I had it in an earlier version when debugging a race condition, then refactored so it wasn't needed. (Now removed.) |
||
|
||
REGISTRY_ENV_VAR = "AIRBYTE_LOCAL_REGISTRY" | ||
REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" | ||
|
||
|
||
@dataclass | ||
class ConnectorMetadata: | ||
name: str | ||
latest_available_version: str | ||
|
||
|
||
_cache: dict[str, ConnectorMetadata] | None = None | ||
def _get_registry_url() -> str: | ||
if REGISTRY_ENV_VAR in os.environ: | ||
return str(os.environ.get(REGISTRY_ENV_VAR)) | ||
|
||
REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" | ||
return REGISTRY_URL | ||
|
||
|
||
def _update_cache() -> None: | ||
global _cache | ||
if os.environ.get("AIRBYTE_LOCAL_REGISTRY"): | ||
with Path(str(os.environ.get("AIRBYTE_LOCAL_REGISTRY"))).open() as f: | ||
data = json.load(f) | ||
else: | ||
def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]: | ||
"""Return the registry cache.""" | ||
global __cache | ||
if __cache and not force_refresh: | ||
return __cache | ||
|
||
registry_url = _get_registry_url() | ||
if registry_url.startswith("http"): | ||
response = requests.get( | ||
REGISTRY_URL, headers={"User-Agent": f"airbyte-lib-{get_version()}"} | ||
registry_url, headers={"User-Agent": f"airbyte-lib-{get_version()}"} | ||
) | ||
response.raise_for_status() | ||
data = response.json() | ||
_cache = {} | ||
else: | ||
# Assume local file | ||
with Path(registry_url).open() as f: | ||
data = json.load(f) | ||
|
||
new_cache: dict[str, ConnectorMetadata] = {} | ||
|
||
for connector in data["sources"]: | ||
name = connector["dockerRepository"].replace("airbyte/", "") | ||
_cache[name] = ConnectorMetadata(name, connector["dockerImageTag"]) | ||
new_cache[name] = ConnectorMetadata(name, connector["dockerImageTag"]) | ||
|
||
if len(new_cache) == 0: | ||
raise exc.AirbyteLibInternalError( | ||
message="Connector registry is empty.", | ||
context={ | ||
"registry_url": _get_registry_url(), | ||
}, | ||
) | ||
|
||
__cache = new_cache | ||
return __cache | ||
|
||
|
||
def get_connector_metadata(name: str) -> ConnectorMetadata: | ||
"""Check the cache for the connector. | ||
|
||
If the cache is empty, populate by calling update_cache. | ||
""" | ||
if not _cache: | ||
_update_cache() | ||
if not _cache or name not in _cache: | ||
raise exc.AirbyteLibInputError( | ||
message="Connector name not found in registry.", | ||
guidance="Please double check the connector name.", | ||
cache = copy(_get_registry_cache()) | ||
if not cache: | ||
raise exc.AirbyteLibInternalError( | ||
message="Connector registry could not be loaded.", | ||
context={ | ||
"registry_url": _get_registry_url(), | ||
}, | ||
) | ||
if name not in cache: | ||
raise exc.AirbyteConnectorNotRegisteredError( | ||
connector_name=name, | ||
context={ | ||
"connector_name": name, | ||
"registry_url": _get_registry_url(), | ||
"available_connectors": sorted(cache.keys()), | ||
}, | ||
) | ||
return _cache[name] | ||
return cache[name] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
from typing import TYPE_CHECKING, Any | ||
|
||
import jsonschema | ||
import yaml | ||
|
||
from airbyte_protocol.models import ( | ||
AirbyteCatalog, | ||
|
@@ -68,7 +69,13 @@ def __init__( | |
name: str, | ||
config: dict[str, Any] | None = None, | ||
streams: list[str] | None = None, | ||
*, | ||
validate: bool = False, | ||
) -> None: | ||
"""Initialize the source. | ||
|
||
If config is provided, it will be validated against the spec if validate is True. | ||
""" | ||
self._processed_records = 0 | ||
self.executor = executor | ||
self.name = name | ||
|
@@ -79,7 +86,7 @@ def __init__( | |
self._spec: ConnectorSpecification | None = None | ||
self._selected_stream_names: list[str] | None = None | ||
if config is not None: | ||
self.set_config(config) | ||
self.set_config(config, validate=validate) | ||
if streams is not None: | ||
self.set_streams(streams) | ||
|
||
|
@@ -102,8 +109,22 @@ def set_streams(self, streams: list[str]) -> None: | |
) | ||
self._selected_stream_names = streams | ||
|
||
def set_config(self, config: dict[str, Any]) -> None: | ||
self._validate_config(config) | ||
def set_config( | ||
self, | ||
config: dict[str, Any], | ||
*, | ||
validate: bool = False, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the advantage of deferring the check? I thought of it being quite nice as it will tell you as early as possible if your config won't work, instead of waiting for actually invoking. What's the workflow you had in mind here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed in DM. |
||
) -> None: | ||
"""Set the config for the connector. | ||
|
||
If validate is True, raise an exception if the config fails validation. | ||
|
||
If validate is False, validation will be deferred until check() or validate_config() | ||
is called. | ||
""" | ||
if validate: | ||
self.validate_config(config) | ||
|
||
self._config_dict = config | ||
|
||
@property | ||
|
@@ -131,9 +152,13 @@ def _discover(self) -> AirbyteCatalog: | |
log_text=self._last_log_messages, | ||
) | ||
|
||
def _validate_config(self, config: dict[str, Any]) -> None: | ||
"""Validate the config against the spec.""" | ||
def validate_config(self, config: dict[str, Any] | None = None) -> None: | ||
"""Validate the config against the spec. | ||
|
||
If config is not provided, the already-set config will be validated. | ||
""" | ||
spec = self._get_spec(force_refresh=False) | ||
config = self._config if config is None else config | ||
jsonschema.validate(config, spec.connectionSpecification) | ||
|
||
def get_available_streams(self) -> list[str]: | ||
|
@@ -161,6 +186,21 @@ def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: | |
log_text=self._last_log_messages, | ||
) | ||
|
||
@property | ||
def _yaml_spec(self) -> str: | ||
"""Get the spec as a yaml string. | ||
|
||
For now, the primary use case is for writing and debugging a valid config for a source. | ||
|
||
This is private for now because we probably want better polish before exposing this | ||
as a stable interface. This will also get easier when we have docs links with this info | ||
for each connector. | ||
""" | ||
spec_obj: ConnectorSpecification = self._get_spec() | ||
spec_dict = spec_obj.dict(exclude_unset=True) | ||
# convert to a yaml string | ||
return yaml.dump(spec_dict) | ||
|
||
@property | ||
def discovered_catalog(self) -> AirbyteCatalog: | ||
"""Get the raw catalog for the given streams. | ||
|
@@ -248,17 +288,23 @@ def check(self) -> None: | |
* Make sure the subprocess is killed when the function returns. | ||
""" | ||
with as_temp_files([self._config]) as [config_file]: | ||
for msg in self._execute(["check", "--config", config_file]): | ||
if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus: | ||
if msg.connectionStatus.status != Status.FAILED: | ||
return # Success! | ||
|
||
raise exc.AirbyteConnectorCheckFailedError( | ||
context={ | ||
"message": msg.connectionStatus.message, | ||
} | ||
) | ||
raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages) | ||
try: | ||
for msg in self._execute(["check", "--config", config_file]): | ||
if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus: | ||
if msg.connectionStatus.status != Status.FAILED: | ||
return # Success! | ||
|
||
raise exc.AirbyteConnectorCheckFailedError( | ||
context={ | ||
"message": msg.connectionStatus.message, | ||
} | ||
) | ||
raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages) | ||
except exc.AirbyteConnectorReadError as ex: | ||
raise exc.AirbyteConnectorCheckFailedError( | ||
message="The connector failed to check the connection.", | ||
log_text=ex.log_text, | ||
) from ex | ||
|
||
def install(self) -> None: | ||
"""Install the connector if it is not yet installed.""" | ||
|
@@ -338,7 +384,8 @@ def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]: | |
* Read the output line by line of the subprocess and serialize them AirbyteMessage objects. | ||
Drop if not valid. | ||
""" | ||
self.executor.ensure_installation() | ||
# Fail early if the connector is not installed. | ||
self.executor.ensure_installation(auto_fix=False) | ||
|
||
try: | ||
self._last_log_messages = [] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the code, by my understanding
AirbyteConnectorNotFoundError
means the connector is not found locally, andAirbyteConnectorNotRegisteredError
means the connector is not found in the registry, but this docstring indicates otherwise.Could you clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! This was a mistake. I've renamed the error to
AirbyteConnectorExecutableNotFoundError
so it is more explicit.