Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

airbyte-lib base implementation #33409

Merged
merged 34 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f76a0d3
base implementation
Dec 13, 2023
19eba07
Merge remote-tracking branch 'origin/master' into flash1293/airbyte-lib
Dec 13, 2023
a960851
implement peek
Dec 13, 2023
17c17b9
prepare
Dec 13, 2023
6dd8815
various things
Dec 13, 2023
81cc214
tests and stuff
Dec 13, 2023
585631c
format
Dec 13, 2023
73b5b18
Merge remote-tracking branch 'origin/master' into flash1293/airbyte-lib
Dec 14, 2023
7a96d61
hook into CI
Dec 14, 2023
796ca18
format
Dec 14, 2023
24b7df6
run integration_tests via airbyte-ci
alafanechere Dec 14, 2023
44bf767
Merge remote-tracking branch 'origin/master' into flash1293/airbyte-lib
Dec 14, 2023
8734cfc
Merge branch 'flash1293/airbyte-lib' of github.com:airbytehq/airbyte …
Dec 14, 2023
4f349cc
fix CI invocation
Dec 14, 2023
aae69e5
clean up
Dec 14, 2023
90cbc7a
Merge branch 'master' into flash1293/airbyte-lib
Dec 15, 2023
ceea28d
Merge branch 'master' into flash1293/airbyte-lib
Dec 18, 2023
22f9504
Merge branch 'master' into flash1293/airbyte-lib
Dec 18, 2023
e35b0c6
Merge branch 'master' into flash1293/airbyte-lib
aaronsteers Dec 19, 2023
9e003a9
airbyte-lib: Add path executor (#33600)
Dec 19, 2023
a5869e3
Merge branch 'flash1293/airbyte-lib' of github.com:airbytehq/airbyte …
Dec 19, 2023
89059c8
Merge remote-tracking branch 'origin/master' into flash1293/airbyte-lib
Dec 19, 2023
8418dca
code format
Dec 19, 2023
6d11890
Merge remote-tracking branch 'origin/master' into flash1293/airbyte-lib
Dec 19, 2023
babf999
mypy
Dec 19, 2023
d2e1e46
always show last log messages
Dec 19, 2023
8c86f9f
Merge remote-tracking branch 'origin/master' into flash1293/airbyte-lib
Dec 20, 2023
e090490
add py.typed
Dec 20, 2023
4ca3a6b
add mypy checks
Dec 20, 2023
c6ec803
rename peek to read_stream
Dec 20, 2023
e69c568
refactor to match new API
Dec 20, 2023
19f675e
more refactoring
Dec 20, 2023
6d4278e
add header to registry request and fix test
Dec 20, 2023
966fe43
fix wrong variable used
Dec 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/airbyte-ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ jobs:
- 'airbyte-ci/connectors/metadata_service/lib/**'
- 'airbyte-ci/connectors/metadata_service/orchestrator/**'
- '!**/*.md'
airbyte_lib:
- 'airbyte_lib/**'
- '!**/*.md'
- name: Run airbyte-ci/connectors/connector_ops tests
if: steps.changes.outputs.ops_any_changed == 'true'
Expand Down Expand Up @@ -132,3 +135,18 @@ jobs:
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
airbyte_ci_binary_url: ${{ inputs.airbyte_ci_binary_url || 'https://connectors.airbyte.com/airbyte-ci/releases/ubuntu/latest/airbyte-ci' }}
tailscale_auth_key: ${{ secrets.TAILSCALE_AUTH_KEY }}

- name: Run airbyte-lib tests
if: steps.changes.outputs.airbyte_lib_any_changed == 'true'
id: run-airbyte-lib-tests
uses: ./.github/actions/run-dagger-pipeline
with:
context: "pull_request"
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
subcommand: "test airbyte-lib"
airbyte_ci_binary_url: ${{ inputs.airbyte_ci_binary_url || 'https://connectors.airbyte.com/airbyte-ci/releases/ubuntu/latest/airbyte-ci' }}
tailscale_auth_key: ${{ secrets.TAILSCALE_AUTH_KEY }}
1 change: 1 addition & 0 deletions airbyte-lib/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.venv*
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that we have to move it, but I'd also be fine with adding to the root .gitignore:

.venv
venv

10 changes: 10 additions & 0 deletions airbyte-lib/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# airbyte-lib

airbyte-lib is a library that allows to run Airbyte syncs embedded into any Python application, without the need to run Airbyte server.

## Development

* Make sure [Poetry is installed](https://python-poetry.org/docs/#).
* Run `poetry install`
* For examples, check out the `examples` folder. They can be run via `poetry run python examples/<example file>`
* Unit tests and type checks can be run via `poetry run pytest`
12 changes: 12 additions & 0 deletions airbyte-lib/airbyte_lib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@

from .factories import (get_connector, get_in_memory_cache)
from .sync_result import (Dataset, SyncResult)
from .source import (Source)

__all__ = [
"get_connector",
"get_in_memory_cache",
"Dataset",
"SyncResult",
"Source",
]
54 changes: 54 additions & 0 deletions airbyte-lib/airbyte_lib/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.


from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable, List

from airbyte_protocol.models import AirbyteRecordMessage


class Cache(ABC):
@abstractmethod
def write(self, messages: Iterable[AirbyteRecordMessage]):
pass

@abstractmethod
def get_iterable(self, stream: str) -> Iterable[Dict[str, Any]]:
pass

@abstractmethod
def get_pandas(self, stream: str) -> Any:
pass

@abstractmethod
def get_sql_table(self, stream: str) -> Any:
pass

@abstractmethod
def get_sql_engine(self) -> Any:
pass


class InMemoryCache(Cache):
"""The in-memory cache is accepting airbyte messages and stores them in a dictionary for streams (one list of dicts per stream)."""

def __init__(self) -> None:
self.streams: Dict[str, List[Dict[str, Any]]] = {}

def write(self, messages: Iterable[AirbyteRecordMessage]) -> None:
for message in messages:
if message.stream not in self.streams:
self.streams[message.stream] = []
self.streams[message.stream].append(message.data)

def get_iterable(self, stream: str) -> Iterable[Dict[str, Any]]:
return iter(self.streams[stream])

def get_pandas(self, stream: str) -> Any:
raise NotImplementedError()

def get_sql_table(self, stream: str) -> Any:
raise NotImplementedError()

def get_sql_engine(self) -> Any:
raise NotImplementedError()
155 changes: 155 additions & 0 deletions airbyte-lib/airbyte_lib/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import os
import subprocess
import sys
from abc import ABC, abstractmethod
from contextlib import contextmanager
from pathlib import Path
from typing import IO, Generator, Iterable, List

from airbyte_lib.registry import ConnectorMetadata


class Executor(ABC):
def __init__(self, metadata: ConnectorMetadata, target_version: str = "latest"):
self.metadata = metadata
self.target_version = target_version if target_version != "latest" else metadata.latest_available_version

@abstractmethod
def execute(self, args: List[str]) -> Iterable[str]:
pass

@abstractmethod
def ensure_installation(self):
pass

@abstractmethod
def install(self):
pass


@contextmanager
def _stream_from_subprocess(args: List[str]) -> Generator[Iterable[str], None, None]:
process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)

def _stream_from_file(file: IO[str]):
while True:
line = file.readline()
if not line:
break
yield line

if process.stdout is None:
raise Exception("Failed to start subprocess")
try:
yield _stream_from_file(process.stdout)
finally:
# Close the stdout stream
if process.stdout:
process.stdout.close()

# Terminate the process if it is still running
if process.poll() is None: # Check if the process is still running
process.terminate()
try:
# Wait for a short period to allow process to terminate gracefully
process.wait(timeout=10)
except subprocess.TimeoutExpired:
# If the process does not terminate within the timeout, force kill it
process.kill()

# Now, the process is either terminated or killed. Check the exit code.
exit_code = process.wait()

# If the exit code is not 0 or -15 (SIGTERM), raise an exception
if exit_code != 0 and exit_code != -15:
raise Exception(f"Process exited with code {exit_code}")


class VenvExecutor(Executor):
def __init__(self, metadata: ConnectorMetadata, target_version: str = "latest", install_if_missing: bool = False):
super().__init__(metadata, target_version)
self.install_if_missing = install_if_missing

def _get_venv_name(self):
return f".venv-{self.metadata.name}"

def _get_connector_path(self):
return Path(self._get_venv_name(), "bin", self.metadata.name)

def _run_subprocess_and_raise_on_failure(self, args: List[str]):
result = subprocess.run(args)
if result.returncode != 0:
raise Exception(f"Install process exited with code {result.returncode}")

def install(self):
venv_name = self._get_venv_name()
self._run_subprocess_and_raise_on_failure([sys.executable, "-m", "venv", venv_name])

pip_path = os.path.join(venv_name, "bin", "pip")

# TODO this is a temporary install path that will be replaced with a proper package name once they are published. At this point we are also using the version
package_to_install = f"../airbyte-integrations/connectors/{self.metadata.name}"
self._run_subprocess_and_raise_on_failure([pip_path, "install", "-e", package_to_install])
Comment on lines +97 to +99
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an updated version of this in a new PR:


def _get_installed_version(self):
"""
In the venv, run the following: python -c "from importlib.metadata import version; print(version('<connector-name>'))"
"""
venv_name = self._get_venv_name()
connector_name = self.metadata.name
return subprocess.check_output(
[os.path.join(venv_name, "bin", "python"), "-c", f"from importlib.metadata import version; print(version('{connector_name}'))"],
universal_newlines=True,
).strip()

def ensure_installation(self):
venv_name = f".venv-{self.metadata.name}"
venv_path = Path(venv_name)
if not venv_path.exists():
if not self.install_if_missing:
raise Exception(f"Connector {self.metadata.name} is not available - venv {venv_name} does not exist")
self.install()

connector_path = self._get_connector_path()
if not connector_path.exists():
raise Exception(f"Could not find connector {self.metadata.name} in venv {venv_name}")

installed_version = self._get_installed_version()
if installed_version != self.target_version:
# If the version doesn't match, reinstall
self.install()

# Check the version again
version_after_install = self._get_installed_version()
if version_after_install != self.target_version:
raise Exception(
f"Failed to install connector {self.metadata.name} version {self.target_version}. Installed version is {version_after_install}"
)
Comment on lines +124 to +134
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a follow-on PR, I suggest a flag to make version verification an opt-in behavior, where version would not be checked by default, but could be checked/auto-fixed as needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will get less relevant once we publish properly to pypi, but I can see how the current state might be too restrictive or take too much control from the user.

What some tools are doing is to log a warning if the version doesn't match, but don't fail right away. Seems suitable here, but nothing we can't adjust before the first release.

Copy link
Collaborator

@aaronsteers aaronsteers Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What some tools are doing is to log a warning if the version doesn't match, but don't fail right away. Seems suitable here, but nothing we can't adjust before the first release.

Totally! Yeah, I think a warning is a nice balance - gives the user information but doesn't necessary cause direct failure. In that case, we could always check version, and the "opt in" part would be whether to hard-fail, rather than warn. I can imagine a hard-failure might be preferred in specific instances where we think we are explicitly requesting a particular version number.


def execute(self, args: List[str]) -> Iterable[str]:
connector_path = self._get_connector_path()

with _stream_from_subprocess([str(connector_path)] + args) as stream:
yield from stream


class PathExecutor(Executor):
def ensure_installation(self):
try:
self.execute(["spec"])
except Exception as e:
raise Exception(f"Connector {self.metadata.name} is not available - executing it failed: {e}")

def install(self):
raise Exception(f"Connector {self.metadata.name} is not available - cannot install it")

def execute(self, args: List[str]) -> Iterable[str]:
with _stream_from_subprocess([self.metadata.name] + args) as stream:
yield from stream
34 changes: 34 additions & 0 deletions airbyte-lib/airbyte_lib/factories.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.


from typing import Any, Dict, Optional

from airbyte_lib.cache import InMemoryCache
from airbyte_lib.executor import PathExecutor, VenvExecutor
from airbyte_lib.registry import get_connector_metadata
from airbyte_lib.source import Source


def get_in_memory_cache():
return InMemoryCache()


def get_connector(
name: str,
version: str = "latest",
config: Optional[Dict[str, Any]] = None,
use_local_install: bool = False,
install_if_missing: bool = False,
):
"""
Get a connector by name and version.
:param name: connector name
:param version: connector version - if not provided, the most recent version will be used
:param config: connector config - if not provided, you need to set it later via the set_config method
:param use_local_install: whether to use a virtual environment to run the connector. If True, the connector is expected to be available on the path (e.g. installed via pip). If False, the connector will be installed automatically in a virtual environment.
:param install_if_missing: whether to install the connector if it is not available locally. This parameter is ignored if use_local_install is True.
"""
metadata = get_connector_metadata(name)
return Source(
PathExecutor(metadata, version) if use_local_install else VenvExecutor(metadata, version, install_if_missing), name, config
)
Empty file.
47 changes: 47 additions & 0 deletions airbyte-lib/airbyte_lib/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import importlib.metadata
import json
import os
from dataclasses import dataclass
from typing import Dict, Optional

import requests


@dataclass
class ConnectorMetadata:
name: str
latest_available_version: str


_cache: Optional[Dict[str, ConnectorMetadata]] = None
airbyte_lib_version = importlib.metadata.version("airbyte-lib")

REGISTRY_URL = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"


def _update_cache() -> None:
global _cache
if os.environ.get("AIRBYTE_LOCAL_REGISTRY"):
with open(str(os.environ.get("AIRBYTE_LOCAL_REGISTRY")), "r") as f:
data = json.load(f)
else:
response = requests.get(REGISTRY_URL, headers={"User-Agent": f"airbyte-lib-{airbyte_lib_version}"})
response.raise_for_status()
data = response.json()
_cache = {}
for connector in data["sources"]:
name = connector["dockerRepository"].replace("airbyte/", "")
_cache[name] = ConnectorMetadata(name, connector["dockerImageTag"])


def get_connector_metadata(name: str):
"""
check the cache for the connector. If the cache is empty, populate by calling update_cache
"""
if not _cache:
_update_cache()
if not _cache or name not in _cache:
raise Exception(f"Connector {name} not found")
return _cache[name]
Loading
Loading