-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
airbyte-lib base implementation #33409
Changes from all commits
f76a0d3
19eba07
a960851
17c17b9
6dd8815
81cc214
585631c
73b5b18
7a96d61
796ca18
24b7df6
44bf767
8734cfc
4f349cc
aae69e5
90cbc7a
ceea28d
22f9504
e35b0c6
9e003a9
a5869e3
89059c8
8418dca
6d11890
babf999
d2e1e46
8c86f9f
e090490
4ca3a6b
c6ec803
e69c568
19f675e
6d4278e
966fe43
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
.venv* | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
# airbyte-lib | ||
|
||
airbyte-lib is a library that allows to run Airbyte syncs embedded into any Python application, without the need to run Airbyte server. | ||
|
||
## Development | ||
|
||
* Make sure [Poetry is installed](https://python-poetry.org/docs/#). | ||
* Run `poetry install` | ||
* For examples, check out the `examples` folder. They can be run via `poetry run python examples/<example file>` | ||
* Unit tests and type checks can be run via `poetry run pytest` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
|
||
from .factories import (get_connector, get_in_memory_cache) | ||
from .sync_result import (Dataset, SyncResult) | ||
from .source import (Source) | ||
|
||
__all__ = [ | ||
"get_connector", | ||
"get_in_memory_cache", | ||
"Dataset", | ||
"SyncResult", | ||
"Source", | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
|
||
|
||
from abc import ABC, abstractmethod | ||
from typing import Any, Dict, Iterable, List | ||
|
||
from airbyte_protocol.models import AirbyteRecordMessage | ||
|
||
|
||
class Cache(ABC): | ||
@abstractmethod | ||
def write(self, messages: Iterable[AirbyteRecordMessage]): | ||
pass | ||
|
||
@abstractmethod | ||
def get_iterable(self, stream: str) -> Iterable[Dict[str, Any]]: | ||
pass | ||
|
||
@abstractmethod | ||
def get_pandas(self, stream: str) -> Any: | ||
pass | ||
|
||
@abstractmethod | ||
def get_sql_table(self, stream: str) -> Any: | ||
pass | ||
|
||
@abstractmethod | ||
def get_sql_engine(self) -> Any: | ||
pass | ||
|
||
|
||
class InMemoryCache(Cache): | ||
"""The in-memory cache is accepting airbyte messages and stores them in a dictionary for streams (one list of dicts per stream).""" | ||
|
||
def __init__(self) -> None: | ||
self.streams: Dict[str, List[Dict[str, Any]]] = {} | ||
|
||
def write(self, messages: Iterable[AirbyteRecordMessage]) -> None: | ||
for message in messages: | ||
if message.stream not in self.streams: | ||
self.streams[message.stream] = [] | ||
self.streams[message.stream].append(message.data) | ||
|
||
def get_iterable(self, stream: str) -> Iterable[Dict[str, Any]]: | ||
return iter(self.streams[stream]) | ||
|
||
def get_pandas(self, stream: str) -> Any: | ||
raise NotImplementedError() | ||
|
||
def get_sql_table(self, stream: str) -> Any: | ||
raise NotImplementedError() | ||
|
||
def get_sql_engine(self) -> Any: | ||
raise NotImplementedError() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
|
||
import os | ||
import subprocess | ||
import sys | ||
from abc import ABC, abstractmethod | ||
from contextlib import contextmanager | ||
from pathlib import Path | ||
from typing import IO, Generator, Iterable, List | ||
|
||
from airbyte_lib.registry import ConnectorMetadata | ||
|
||
|
||
class Executor(ABC): | ||
def __init__(self, metadata: ConnectorMetadata, target_version: str = "latest"): | ||
self.metadata = metadata | ||
self.target_version = target_version if target_version != "latest" else metadata.latest_available_version | ||
|
||
@abstractmethod | ||
def execute(self, args: List[str]) -> Iterable[str]: | ||
pass | ||
|
||
@abstractmethod | ||
def ensure_installation(self): | ||
pass | ||
|
||
@abstractmethod | ||
def install(self): | ||
pass | ||
|
||
|
||
@contextmanager | ||
def _stream_from_subprocess(args: List[str]) -> Generator[Iterable[str], None, None]: | ||
process = subprocess.Popen( | ||
args, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.STDOUT, | ||
universal_newlines=True, | ||
) | ||
|
||
def _stream_from_file(file: IO[str]): | ||
while True: | ||
line = file.readline() | ||
if not line: | ||
break | ||
yield line | ||
|
||
if process.stdout is None: | ||
raise Exception("Failed to start subprocess") | ||
try: | ||
yield _stream_from_file(process.stdout) | ||
finally: | ||
# Close the stdout stream | ||
if process.stdout: | ||
process.stdout.close() | ||
|
||
# Terminate the process if it is still running | ||
if process.poll() is None: # Check if the process is still running | ||
process.terminate() | ||
try: | ||
# Wait for a short period to allow process to terminate gracefully | ||
process.wait(timeout=10) | ||
except subprocess.TimeoutExpired: | ||
# If the process does not terminate within the timeout, force kill it | ||
process.kill() | ||
|
||
# Now, the process is either terminated or killed. Check the exit code. | ||
exit_code = process.wait() | ||
|
||
# If the exit code is not 0 or -15 (SIGTERM), raise an exception | ||
if exit_code != 0 and exit_code != -15: | ||
raise Exception(f"Process exited with code {exit_code}") | ||
|
||
|
||
class VenvExecutor(Executor): | ||
def __init__(self, metadata: ConnectorMetadata, target_version: str = "latest", install_if_missing: bool = False): | ||
super().__init__(metadata, target_version) | ||
self.install_if_missing = install_if_missing | ||
|
||
def _get_venv_name(self): | ||
return f".venv-{self.metadata.name}" | ||
|
||
def _get_connector_path(self): | ||
return Path(self._get_venv_name(), "bin", self.metadata.name) | ||
|
||
def _run_subprocess_and_raise_on_failure(self, args: List[str]): | ||
result = subprocess.run(args) | ||
if result.returncode != 0: | ||
raise Exception(f"Install process exited with code {result.returncode}") | ||
|
||
def install(self): | ||
venv_name = self._get_venv_name() | ||
self._run_subprocess_and_raise_on_failure([sys.executable, "-m", "venv", venv_name]) | ||
|
||
pip_path = os.path.join(venv_name, "bin", "pip") | ||
|
||
# TODO this is a temporary install path that will be replaced with a proper package name once they are published. At this point we are also using the version | ||
package_to_install = f"../airbyte-integrations/connectors/{self.metadata.name}" | ||
self._run_subprocess_and_raise_on_failure([pip_path, "install", "-e", package_to_install]) | ||
Comment on lines
+97
to
+99
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have an updated version of this in a new PR: |
||
|
||
def _get_installed_version(self): | ||
""" | ||
In the venv, run the following: python -c "from importlib.metadata import version; print(version('<connector-name>'))" | ||
""" | ||
venv_name = self._get_venv_name() | ||
connector_name = self.metadata.name | ||
return subprocess.check_output( | ||
[os.path.join(venv_name, "bin", "python"), "-c", f"from importlib.metadata import version; print(version('{connector_name}'))"], | ||
universal_newlines=True, | ||
).strip() | ||
|
||
def ensure_installation(self): | ||
venv_name = f".venv-{self.metadata.name}" | ||
venv_path = Path(venv_name) | ||
if not venv_path.exists(): | ||
if not self.install_if_missing: | ||
raise Exception(f"Connector {self.metadata.name} is not available - venv {venv_name} does not exist") | ||
self.install() | ||
|
||
connector_path = self._get_connector_path() | ||
if not connector_path.exists(): | ||
raise Exception(f"Could not find connector {self.metadata.name} in venv {venv_name}") | ||
|
||
installed_version = self._get_installed_version() | ||
if installed_version != self.target_version: | ||
# If the version doesn't match, reinstall | ||
self.install() | ||
|
||
# Check the version again | ||
version_after_install = self._get_installed_version() | ||
if version_after_install != self.target_version: | ||
raise Exception( | ||
f"Failed to install connector {self.metadata.name} version {self.target_version}. Installed version is {version_after_install}" | ||
) | ||
Comment on lines
+124
to
+134
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a follow-on PR, I suggest a flag to make version verification an opt-in behavior, where version would not be checked by default, but could be checked/auto-fixed as needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will get less relevant once we publish properly to pypi, but I can see how the current state might be too restrictive or take too much control from the user. What some tools are doing is to log a warning if the version doesn't match, but don't fail right away. Seems suitable here, but nothing we can't adjust before the first release. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Totally! Yeah, I think a warning is a nice balance - gives the user information but doesn't necessary cause direct failure. In that case, we could always check version, and the "opt in" part would be whether to hard-fail, rather than warn. I can imagine a hard-failure might be preferred in specific instances where we think we are explicitly requesting a particular version number. |
||
|
||
def execute(self, args: List[str]) -> Iterable[str]: | ||
connector_path = self._get_connector_path() | ||
|
||
with _stream_from_subprocess([str(connector_path)] + args) as stream: | ||
yield from stream | ||
|
||
|
||
class PathExecutor(Executor): | ||
def ensure_installation(self): | ||
try: | ||
self.execute(["spec"]) | ||
except Exception as e: | ||
raise Exception(f"Connector {self.metadata.name} is not available - executing it failed: {e}") | ||
|
||
def install(self): | ||
raise Exception(f"Connector {self.metadata.name} is not available - cannot install it") | ||
|
||
def execute(self, args: List[str]) -> Iterable[str]: | ||
with _stream_from_subprocess([self.metadata.name] + args) as stream: | ||
yield from stream |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
|
||
|
||
from typing import Any, Dict, Optional | ||
|
||
from airbyte_lib.cache import InMemoryCache | ||
from airbyte_lib.executor import PathExecutor, VenvExecutor | ||
from airbyte_lib.registry import get_connector_metadata | ||
from airbyte_lib.source import Source | ||
|
||
|
||
def get_in_memory_cache(): | ||
return InMemoryCache() | ||
|
||
|
||
def get_connector( | ||
name: str, | ||
version: str = "latest", | ||
config: Optional[Dict[str, Any]] = None, | ||
use_local_install: bool = False, | ||
install_if_missing: bool = False, | ||
): | ||
""" | ||
Get a connector by name and version. | ||
:param name: connector name | ||
:param version: connector version - if not provided, the most recent version will be used | ||
:param config: connector config - if not provided, you need to set it later via the set_config method | ||
:param use_local_install: whether to use a virtual environment to run the connector. If True, the connector is expected to be available on the path (e.g. installed via pip). If False, the connector will be installed automatically in a virtual environment. | ||
:param install_if_missing: whether to install the connector if it is not available locally. This parameter is ignored if use_local_install is True. | ||
""" | ||
metadata = get_connector_metadata(name) | ||
return Source( | ||
PathExecutor(metadata, version) if use_local_install else VenvExecutor(metadata, version, install_if_missing), name, config | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
# Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
|
||
import importlib.metadata | ||
import json | ||
import os | ||
from dataclasses import dataclass | ||
from typing import Dict, Optional | ||
|
||
import requests | ||
|
||
|
||
@dataclass | ||
class ConnectorMetadata: | ||
name: str | ||
latest_available_version: str | ||
|
||
|
||
_cache: Optional[Dict[str, ConnectorMetadata]] = None | ||
airbyte_lib_version = importlib.metadata.version("airbyte-lib") | ||
|
||
REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json" | ||
|
||
|
||
def _update_cache() -> None: | ||
global _cache | ||
if os.environ.get("AIRBYTE_LOCAL_REGISTRY"): | ||
with open(str(os.environ.get("AIRBYTE_LOCAL_REGISTRY")), "r") as f: | ||
data = json.load(f) | ||
else: | ||
response = requests.get(REGISTRY_URL, headers={"User-Agent": f"airbyte-lib-{airbyte_lib_version}"}) | ||
response.raise_for_status() | ||
data = response.json() | ||
_cache = {} | ||
for connector in data["sources"]: | ||
name = connector["dockerRepository"].replace("airbyte/", "") | ||
_cache[name] = ConnectorMetadata(name, connector["dockerImageTag"]) | ||
|
||
|
||
def get_connector_metadata(name: str): | ||
""" | ||
check the cache for the connector. If the cache is empty, populate by calling update_cache | ||
""" | ||
if not _cache: | ||
_update_cache() | ||
if not _cache or name not in _cache: | ||
raise Exception(f"Connector {name} not found") | ||
return _cache[name] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that we have to move it, but I'd also be fine with adding to the root
.gitignore
: