Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

airbyte-lib: Add pip_url option and enforce source versions in a more consistent manner #33967

Merged
merged 2 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 58 additions & 21 deletions airbyte-lib/airbyte_lib/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@

from airbyte_lib.registry import ConnectorMetadata

_LATEST_VERSION = "latest"


class Executor(ABC):
def __init__(self, metadata: ConnectorMetadata, target_version: str = "latest"):
def __init__(
self,
metadata: ConnectorMetadata,
target_version: str | None = None,
) -> None:
self.metadata = metadata
self.target_version = target_version if target_version != "latest" else metadata.latest_available_version
self.enforce_version = target_version is not None
if target_version is None or target_version == _LATEST_VERSION:
self.target_version = metadata.latest_available_version
else:
self.target_version = target_version

@abstractmethod
def execute(self, args: List[str]) -> Iterable[str]:
Expand Down Expand Up @@ -73,10 +83,21 @@ def _stream_from_file(file: IO[str]):


class VenvExecutor(Executor):
def __init__(self, metadata: ConnectorMetadata, target_version: str = "latest", install_if_missing: bool = False):
def __init__(
self,
metadata: ConnectorMetadata,
target_version: str | None = None,
install_if_missing: bool = False,
pip_url: str | None = None,
) -> None:
super().__init__(metadata, target_version)
self.install_if_missing = install_if_missing

# This is a temporary install path that will be replaced with a proper package
# name once they are published.
# TODO: Replace with `f"airbyte-{self.metadata.name}"`
self.pip_url = pip_url or f"../airbyte-integrations/connectors/{self.metadata.name}"

def _get_venv_name(self):
return f".venv-{self.metadata.name}"

Expand All @@ -94,9 +115,7 @@ def install(self):

pip_path = os.path.join(venv_name, "bin", "pip")

# TODO this is a temporary install path that will be replaced with a proper package name once they are published. At this point we are also using the version
package_to_install = f"../airbyte-integrations/connectors/{self.metadata.name}"
self._run_subprocess_and_raise_on_failure([pip_path, "install", "-e", package_to_install])
self._run_subprocess_and_raise_on_failure([pip_path, "install", "-e", self.pip_url])

def _get_installed_version(self):
"""
Expand All @@ -105,11 +124,26 @@ def _get_installed_version(self):
venv_name = self._get_venv_name()
connector_name = self.metadata.name
return subprocess.check_output(
[os.path.join(venv_name, "bin", "python"), "-c", f"from importlib.metadata import version; print(version('{connector_name}'))"],
[
os.path.join(venv_name, "bin", "python"),
"-c",
f"from importlib.metadata import version; print(version('{connector_name}'))",
],
universal_newlines=True,
).strip()

def ensure_installation(self):
def ensure_installation(
self,
):
"""
Ensure that the connector is installed in a virtual environment.
If not yet installed and if install_if_missing is True, then install.

Optionally, verify that the installed version matches the target version.

Note: Version verification is not supported for connectors installed from a
local path.
"""
venv_name = f".venv-{self.metadata.name}"
venv_path = Path(venv_name)
if not venv_path.exists():
Expand All @@ -119,19 +153,22 @@ def ensure_installation(self):

connector_path = self._get_connector_path()
if not connector_path.exists():
raise Exception(f"Could not find connector {self.metadata.name} in venv {venv_name}")

installed_version = self._get_installed_version()
if installed_version != self.target_version:
# If the version doesn't match, reinstall
self.install()

# Check the version again
version_after_install = self._get_installed_version()
if version_after_install != self.target_version:
raise Exception(
f"Failed to install connector {self.metadata.name} version {self.target_version}. Installed version is {version_after_install}"
)
raise FileNotFoundError(
f"Could not find connector '{self.metadata.name}' " f"in venv '{venv_name}' with connector path '{connector_path}'."
)

if self.enforce_version:
installed_version = self._get_installed_version()
if installed_version != self.target_version:
# If the version doesn't match, reinstall
self.install()
Comment on lines +162 to +164
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a suggestion, but I seem to run into a lot of python cases where I get '0.0.0' back - such as when installing from a local path. This might not be necessary, but I think it's worth considering. We can always implement as you've written and just keep an eye out for edge cases. It may already be solved in the latest handling (ignoring when a specific version is not requested), but I want to mention anyway while I'm here.

Suggested change
if installed_version != self.target_version:
# If the version doesn't match, reinstall
self.install()
if installed_version == '0.0.0':
# pass or warn; version is either not declared or not detectable
pass
elif installed_version != self.target_version:
# If the version doesn't match, reinstall
self.install()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this in the interim, but I'm hoping that we don't need to hardcode this with the current scheme - if you are working with local / pip-url installed connectors you would just not worry about the version (and thus not specify it) and airbyte-lib won't worry either, so it won't hit this.

I'll go ahead and merge and we can add the 0.0.0 exception if it keeps coming up in some scenarios.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense! Thanks!


# Check the version again
version_after_install = self._get_installed_version()
if version_after_install != self.target_version:
raise Exception(
f"Failed to install connector {self.metadata.name} version {self.target_version}. Installed version is {version_after_install}"
)

def execute(self, args: List[str]) -> Iterable[str]:
connector_path = self._get_connector_path()
Expand Down
35 changes: 28 additions & 7 deletions airbyte-lib/airbyte_lib/factories.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.


from typing import Any, Dict, Optional
from typing import Any

from airbyte_lib.cache import InMemoryCache
from airbyte_lib.executor import PathExecutor, VenvExecutor
from airbyte_lib.executor import Executor, PathExecutor, VenvExecutor
from airbyte_lib.registry import get_connector_metadata
from airbyte_lib.source import Source

Expand All @@ -15,20 +15,41 @@ def get_in_memory_cache():

def get_connector(
name: str,
version: str = "latest",
config: Optional[Dict[str, Any]] = None,
version: str | None = None,
pip_url: str | None = None,
config: dict[str, Any] | None = None,
use_local_install: bool = False,
install_if_missing: bool = False,
):
"""
Get a connector by name and version.
:param name: connector name
:param version: connector version - if not provided, the most recent version will be used
:param config: connector config - if not provided, you need to set it later via the set_config method
:param version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
:param pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
:param config: connector config - if not provided, you need to set it later via the set_config method.
:param use_local_install: whether to use a virtual environment to run the connector. If True, the connector is expected to be available on the path (e.g. installed via pip). If False, the connector will be installed automatically in a virtual environment.
:param install_if_missing: whether to install the connector if it is not available locally. This parameter is ignored if use_local_install is True.
"""
metadata = get_connector_metadata(name)
if use_local_install:
if pip_url:
raise ValueError("Param 'pip_url' is not supported when 'use_local_install' is True")
if version:
raise ValueError("Param 'version' is not supported when 'use_local_install' is True")
executor: Executor = PathExecutor(
metadata=metadata,
target_version=version,
)

else:
executor = VenvExecutor(
metadata=metadata,
target_version=version,
install_if_missing=install_if_missing,
pip_url=pip_url,
)
return Source(
PathExecutor(metadata, version) if use_local_install else VenvExecutor(metadata, version, install_if_missing), name, config
executor=executor,
name=name,
config=config,
)
34 changes: 31 additions & 3 deletions airbyte-lib/tests/integration_tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import airbyte_lib as ab
import pytest
from airbyte_lib.registry import _update_cache


@pytest.fixture(scope="module", autouse=True)
Expand Down Expand Up @@ -40,9 +41,36 @@ def test_non_existing_connector():
ab.get_connector("source-not-existing", config={"apiKey": "abc"})


def test_wrong_version():
with pytest.raises(Exception):
ab.get_connector("source-test", version="1.2.3", config={"apiKey": "abc"})
@pytest.mark.parametrize(
"latest_available_version, requested_version, raises",
[
("0.0.1", None, False),
("1.2.3", None, False),
("0.0.1", "latest", False),
("1.2.3", "latest", True),
("0.0.1", "0.0.1", False),
("1.2.3", "1.2.3", True),
])
def test_version_enforcement(raises, latest_available_version, requested_version):
""""
Ensures version enforcement works as expected:
* If no version is specified, the current version is accepted
* If the version is specified as "latest", only the latest available version is accepted
* If the version is specified as a semantic version, only the exact version is accepted

In this test, the actually installed version is 0.0.1
"""
_update_cache()
from airbyte_lib.registry import _cache
_cache["source-test"].latest_available_version = latest_available_version
if raises:
with pytest.raises(Exception):
ab.get_connector("source-test", version=requested_version, config={"apiKey": "abc"})
else:
ab.get_connector("source-test", version=requested_version, config={"apiKey": "abc"})

# reset
_cache["source-test"].latest_available_version = "0.0.1"


def test_check():
Expand Down
Loading