Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AirbyteLib: More robust error handling, installation improvements #34572

Merged
merged 30 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
abbb256
new exception type: AirbyteConnectorNotRegisteredError
aaronsteers Jan 26, 2024
3845f5c
make constructors more resilient
aaronsteers Jan 26, 2024
9fccace
print stderr in exception text, cleanup failed install, remove editab…
aaronsteers Jan 26, 2024
a217a6e
move auto-install out of venv constructor, for easier debugging
aaronsteers Jan 26, 2024
6aa85d6
add test to assert that install failure includes pip log text
aaronsteers Jan 26, 2024
dddbc78
update docs
aaronsteers Jan 26, 2024
b1d966b
auto-format
aaronsteers Jan 26, 2024
f61152a
update docs
aaronsteers Jan 26, 2024
d665088
refactor version handling, control for side effects
aaronsteers Jan 26, 2024
809918b
fix exception handling in _get_installed_version()
aaronsteers Jan 26, 2024
4a41ffb
fix tests
aaronsteers Jan 26, 2024
bab5e06
improve thread safety
aaronsteers Jan 27, 2024
10ce077
handle quoted spaces in pip_url
aaronsteers Jan 27, 2024
063bba3
fix import sorts
aaronsteers Jan 27, 2024
ab75be4
standalone validate_config() method
aaronsteers Jan 27, 2024
8880b0b
add Source.yaml_spec property
aaronsteers Jan 27, 2024
3773149
make _yaml_spec a protected member
aaronsteers Jan 27, 2024
90918c8
fix too-limited json package_data glob
aaronsteers Jan 27, 2024
9197728
fix missing copyright str
aaronsteers Jan 28, 2024
f73f288
docstring
aaronsteers Jan 28, 2024
dd9ac99
update docs
aaronsteers Jan 28, 2024
a2bed01
revert source-github change
aaronsteers Jan 28, 2024
2e49154
updated comment
aaronsteers Jan 28, 2024
f975282
remove redundant strings
aaronsteers Jan 28, 2024
ace7208
Merge remote-tracking branch 'origin/master' into aj/airbyte-lib/inst…
aaronsteers Jan 28, 2024
8775c1b
update docs (removes empty cloud page)
aaronsteers Jan 28, 2024
f24226d
remove unused lock
aaronsteers Jan 30, 2024
7370d83
rename AirbyteConnectoNotFoundError to AirbyteConnectorExecutableNotF…
aaronsteers Jan 30, 2024
cfafccc
Merge branch 'master' into aj/airbyte-lib/install-failure-handling
aaronsteers Jan 30, 2024
8446838
allow prereleases in version check
aaronsteers Jan 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 56 additions & 42 deletions airbyte-lib/airbyte_lib/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import subprocess
import sys
from abc import ABC, abstractmethod
from contextlib import contextmanager
from contextlib import contextmanager, suppress
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, NoReturn

Expand All @@ -24,15 +24,25 @@
class Executor(ABC):
def __init__(
self,
metadata: ConnectorMetadata,
*,
name: str | None = None,
metadata: ConnectorMetadata | None = None,
target_version: str | None = None,
) -> None:
self.metadata = metadata
self.enforce_version = target_version is not None
if target_version is None or target_version == _LATEST_VERSION:
self.target_version = metadata.latest_available_version
else:
if name is None and metadata is None:
raise exc.AirbyteLibInternalError(
message="Either name or metadata must be provided."
)

self.name: str = name or metadata.name
self.metadata: ConnectorMetadata | None = metadata
self.enforce_version: bool = target_version is not None

self.target_version: str | None = None
if target_version is not None:
self.target_version = target_version
elif metadata and (target_version is None or target_version == _LATEST_VERSION):
self.target_version = metadata.latest_available_version

@abstractmethod
def execute(self, args: list[str]) -> Iterator[str]:
Expand Down Expand Up @@ -107,31 +117,36 @@ def _stream_from_file(file: IO[str]) -> Generator[str, Any, None]:
class VenvExecutor(Executor):
def __init__(
self,
metadata: ConnectorMetadata,
name: str | None = None,
*,
metadata: ConnectorMetadata | None = None,
target_version: str | None = None,
pip_url: str | None = None,
*,
install_if_missing: bool = False,
) -> None:
super().__init__(metadata, target_version)
self.install_if_missing = install_if_missing
super().__init__(name=name, metadata=metadata, target_version=target_version)

# This is a temporary install path that will be replaced with a proper package
# name once they are published.
# TODO: Replace with `f"airbyte-{self.metadata.name}"`
self.pip_url = pip_url or f"../airbyte-integrations/connectors/{self.metadata.name}"
# TODO: Replace with `f"airbyte-{self.name}"`
self.pip_url = pip_url or f"../airbyte-integrations/connectors/{self.name}"

def _get_venv_name(self) -> str:
return f".venv-{self.metadata.name}"
return f".venv-{self.name}"

def _get_connector_path(self) -> Path:
return Path(self._get_venv_name(), "bin", self.metadata.name)
return Path(self._get_venv_name(), "bin", self.name)

def _run_subprocess_and_raise_on_failure(self, args: list[str]) -> None:
result = subprocess.run(args, check=False)
result = subprocess.run(
args,
check=False,
stderr=subprocess.PIPE,
)
if result.returncode != 0:
raise exc.AirbyteConnectorInstallationError from exc.AirbyteSubprocessFailedError(
exit_code=result.returncode
raise exc.AirbyteSubprocessFailedError(
run_args=args,
exit_code=result.returncode,
log_text=result.stderr.decode("utf-8"),
)

def uninstall(self) -> None:
Expand All @@ -145,7 +160,18 @@ def install(self) -> None:

pip_path = str(Path(venv_name) / "bin" / "pip")

self._run_subprocess_and_raise_on_failure([pip_path, "install", "-e", self.pip_url])
try:
self._run_subprocess_and_raise_on_failure(
args=[pip_path, "install", *self.pip_url.split(" ")]
)
except exc.AirbyteSubprocessFailedError as ex:
# If the installation failed, remove the virtual environment
# Otherwise, the connector will be considered as installed and the user may not be able
# to retry the installation.
with suppress(exc.AirbyteSubprocessFailedError):
self.uninstall()

raise exc.AirbyteConnectorInstallationError from ex

def _get_installed_version(self) -> str:
"""Detect the version of the connector installed.
Expand All @@ -154,7 +180,7 @@ def _get_installed_version(self) -> str:
> python -c "from importlib.metadata import version; print(version('<connector-name>'))"
"""
venv_name = self._get_venv_name()
connector_name = self.metadata.name
connector_name = self.name
return subprocess.check_output(
[
Path(venv_name) / "bin" / "python",
Expand All @@ -176,27 +202,15 @@ def ensure_installation(
Note: Version verification is not supported for connectors installed from a
local path.
"""
venv_name = f".venv-{self.metadata.name}"
venv_name = f".venv-{self.name}"
venv_path = Path(venv_name)
if not venv_path.exists():
if not self.install_if_missing:
raise exc.AirbyteConnectorNotFoundError(
message="Connector not available and venv does not exist.",
guidance=(
"Please ensure the connector is pre-installed or consider enabling "
"`install_if_missing=True`."
),
context={
"connector_name": self.metadata.name,
"venv_name": venv_name,
},
)
self.install()

connector_path = self._get_connector_path()
if not connector_path.exists():
raise exc.AirbyteConnectorNotFoundError(
connector_name=self.metadata.name,
connector_name=self.name,
context={
"venv_name": venv_name,
},
Expand All @@ -212,7 +226,7 @@ def ensure_installation(
version_after_install = self._get_installed_version()
if version_after_install != self.target_version:
raise exc.AirbyteConnectorInstallationError(
connector_name=self.metadata.name,
connector_name=self.name,
context={
"venv_name": venv_name,
"target_version": self.target_version,
Expand All @@ -228,7 +242,7 @@ def execute(self, args: list[str]) -> Iterator[str]:
yield from stream

def get_telemetry_info(self) -> SourceTelemetryInfo:
return SourceTelemetryInfo(self.metadata.name, SourceType.VENV, self.target_version)
return SourceTelemetryInfo(self.name, SourceType.VENV, self.target_version)


class PathExecutor(Executor):
Expand All @@ -237,24 +251,24 @@ def ensure_installation(self) -> None:
self.execute(["spec"])
except Exception as e:
raise exc.AirbyteConnectorNotFoundError(
connector_name=self.metadata.name,
connector_name=self.name,
) from e

def install(self) -> NoReturn:
raise exc.AirbyteConnectorInstallationError(
message="Connector cannot be installed because it is not managed by airbyte-lib.",
connector_name=self.metadata.name,
connector_name=self.name,
)

def uninstall(self) -> NoReturn:
raise exc.AirbyteConnectorInstallationError(
message="Connector cannot be uninstalled because it is not managed by airbyte-lib.",
connector_name=self.metadata.name,
connector_name=self.name,
)

def execute(self, args: list[str]) -> Iterator[str]:
with _stream_from_subprocess([self.metadata.name, *args]) as stream:
with _stream_from_subprocess([self.name, *args]) as stream:
yield from stream

def get_telemetry_info(self) -> SourceTelemetryInfo:
return SourceTelemetryInfo(self.metadata.name, SourceType.LOCAL_INSTALL, version=None)
return SourceTelemetryInfo(self.name, SourceType.LOCAL_INSTALL, version=None)
43 changes: 30 additions & 13 deletions airbyte-lib/airbyte_lib/_factories/connector_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

from typing import Any

from airbyte_lib import exceptions as exc
from airbyte_lib._executor import Executor, PathExecutor, VenvExecutor
from airbyte_lib.exceptions import AirbyteLibInputError
from airbyte_lib.registry import get_connector_metadata
from airbyte_lib.registry import ConnectorMetadata, get_connector_metadata
from airbyte_lib.source import Source


Expand Down Expand Up @@ -35,28 +35,45 @@ def get_connector(
install_if_missing: whether to install the connector if it is not available locally. This
parameter is ignored if use_local_install is True.
"""
metadata = get_connector_metadata(name)
if use_local_install and pip_url:
raise exc.AirbyteLibInputError(
message="Param 'pip_url' is not supported when 'use_local_install' is True."
)

if use_local_install and version:
raise exc.AirbyteLibInputError(
message="Param 'version' is not supported when 'use_local_install' is True."
)

if use_local_install and install_if_missing:
raise exc.AirbyteLibInputError(
message="Param 'install_if_missing' is not supported when 'use_local_install' is True."
)

metadata: ConnectorMetadata | None = None
try:
metadata = get_connector_metadata(name)
except exc.AirbyteConnectorNotRegisteredError:
if not pip_url:
raise

if use_local_install:
if pip_url:
raise AirbyteLibInputError(
message="Param 'pip_url' is not supported when 'use_local_install' is True."
)
if version:
raise AirbyteLibInputError(
message="Param 'version' is not supported when 'use_local_install' is True."
)
executor: Executor = PathExecutor(
metadata=metadata,
name=name,
target_version=version,
)

else:
executor = VenvExecutor(
name=name,
metadata=metadata,
target_version=version,
install_if_missing=install_if_missing,
pip_url=pip_url,
)

if install_if_missing:
executor.install()

return Source(
executor=executor,
name=name,
Expand Down
10 changes: 9 additions & 1 deletion airbyte-lib/airbyte_lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ class AirbyteConnectorRegistryError(AirbyteError):
"""Error when accessing the connector registry."""


@dataclass
class AirbyteConnectorNotRegisteredError(AirbyteConnectorRegistryError):
"""Connector not found in registry."""

connector_name: str | None = None
guidance = "Please double check the connector name."

# Connector Errors


Expand All @@ -185,7 +192,8 @@ class AirbyteConnectorError(AirbyteError):


class AirbyteConnectorNotFoundError(AirbyteConnectorError):
"""Connector not found."""
"""Connector name not found in registry."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code, by my understanding AirbyteConnectorNotFoundError means the connector is not found locally, and AirbyteConnectorNotRegisteredError means the connector is not found in the registry, but this docstring indicates otherwise.

Could you clarify?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! This was a mistake. I've renamed the error to AirbyteConnectorExecutableNotFoundError so it is more explicit.




class AirbyteConnectorInstallationError(AirbyteConnectorError):
Expand Down
8 changes: 2 additions & 6 deletions airbyte-lib/airbyte_lib/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ def get_connector_metadata(name: str) -> ConnectorMetadata:
if not _cache:
_update_cache()
if not _cache or name not in _cache:
raise exc.AirbyteLibInputError(
message="Connector name not found in registry.",
guidance="Please double check the connector name.",
context={
"connector_name": name,
},
raise exc.AirbyteConnectorNotRegisteredError(
connector_name=name,
)
return _cache[name]
26 changes: 23 additions & 3 deletions airbyte-lib/airbyte_lib/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ def __init__(
name: str,
config: dict[str, Any] | None = None,
streams: list[str] | None = None,
*,
validate: bool = False,
) -> None:
"""Initialize the source.

If config is provided, it will be validated against the spec if validate is True.
"""
self._processed_records = 0
self.executor = executor
self.name = name
Expand All @@ -79,7 +85,7 @@ def __init__(
self._spec: ConnectorSpecification | None = None
self._selected_stream_names: list[str] | None = None
if config is not None:
self.set_config(config)
self.set_config(config, validate=validate)
if streams is not None:
self.set_streams(streams)

Expand All @@ -102,10 +108,24 @@ def set_streams(self, streams: list[str]) -> None:
)
self._selected_stream_names = streams

def set_config(self, config: dict[str, Any]) -> None:
self._validate_config(config)
def set_config(
self,
config: dict[str, Any],
*,
validate: bool = False,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the advantage of deferring the check? I thought of it being quite nice as it will tell you as early as possible if your config won't work, instead of waiting for actually invoking.

What's the workflow you had in mind here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed in DM.

) -> None:
"""Set the config for the connector.

If validate is True, raise an exception if the config fails validation.

If validate is False, validation will be deferred until check() is called.
"""
if validate:
self._validate_config(config)

self._config_dict = config


@property
def _config(self) -> dict[str, Any]:
if self._config_dict is None:
Expand Down
18 changes: 14 additions & 4 deletions airbyte-lib/docs/generated/airbyte_lib.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading