Skip to content

Commit

Permalink
airbyte-lib: Use connector metadata (#34697)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron ("AJ") Steers <aj@airbyte.io>
  • Loading branch information
Joe Reuter and aaronsteers authored Jan 31, 2024
1 parent 8e7196e commit f95e0c8
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 23 deletions.
22 changes: 18 additions & 4 deletions airbyte-lib/airbyte_lib/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,19 @@ def __init__(
"""
super().__init__(name=name, metadata=metadata, target_version=target_version)

# This is a temporary install path that will be replaced with a proper package
# name once they are published.
self.pip_url = pip_url or f"airbyte-{self.name}"
if not pip_url and metadata and not metadata.pypi_package_name:
raise exc.AirbyteConnectorNotPyPiPublishedError(
connector_name=self.name,
context={
"metadata": metadata,
},
)

self.pip_url = pip_url or (
metadata.pypi_package_name
if metadata and metadata.pypi_package_name
else f"airbyte-{self.name}"
)
self.install_root = install_root or Path.cwd()

def _get_venv_name(self) -> str:
Expand Down Expand Up @@ -238,7 +248,11 @@ def _get_installed_version(
return None

try:
package_name = f"airbyte-{connector_name}"
package_name = (
self.metadata.pypi_package_name
if self.metadata and self.metadata.pypi_package_name
else f"airbyte-{connector_name}"
)
return subprocess.check_output(
[
self.interpreter_path,
Expand Down
17 changes: 1 addition & 16 deletions airbyte-lib/airbyte_lib/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
import sqlalchemy
import ulid
from overrides import overrides
from sqlalchemy import Column, String, create_engine, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, text
from sqlalchemy.pool import StaticPool
from sqlalchemy.sql.elements import TextClause

Expand Down Expand Up @@ -49,20 +48,6 @@
DEBUG_MODE = False # Set to True to enable additional debug logging.


STREAMS_TABLE_NAME = "_airbytelib_streams"

Base = declarative_base()


class CachedStream(Base): # type: ignore[valid-type,misc]
__tablename__ = STREAMS_TABLE_NAME

stream_name = Column(String)
source_name = Column(String)
table_name = Column(String, primary_key=True)
catalog_metadata = Column(String)


class RecordDedupeMode(enum.Enum):
APPEND = "append"
REPLACE = "replace"
Expand Down
8 changes: 8 additions & 0 deletions airbyte-lib/airbyte_lib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ class AirbyteConnectorNotRegisteredError(AirbyteConnectorRegistryError):
guidance = "Please double check the connector name."


@dataclass
class AirbyteConnectorNotPyPiPublishedError(AirbyteConnectorRegistryError):
"""Connector found, but not published to PyPI."""

connector_name: str | None = None
guidance = "This likely means that the connector is not ready for use with airbyte-lib."


# Connector Errors


Expand Down
18 changes: 16 additions & 2 deletions airbyte-lib/airbyte_lib/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
class ConnectorMetadata:
name: str
latest_available_version: str
pypi_package_name: str | None


def _get_registry_url() -> str:
Expand All @@ -33,6 +34,19 @@ def _get_registry_url() -> str:
return REGISTRY_URL


def _registry_entry_to_connector_metadata(entry: dict) -> ConnectorMetadata:
name = entry["dockerRepository"].replace("airbyte/", "")
remote_registries: dict = entry.get("remoteRegistries", {})
pypi_registry: dict = remote_registries.get("pypi", {})
pypi_package_name: str = pypi_registry.get("packageName", None)
pypi_enabled: bool = pypi_registry.get("enabled", False)
return ConnectorMetadata(
name=name,
latest_available_version=entry["dockerImageTag"],
pypi_package_name=pypi_package_name if pypi_enabled else None,
)


def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMetadata]:
"""Return the registry cache."""
global __cache
Expand All @@ -54,8 +68,8 @@ def _get_registry_cache(*, force_refresh: bool = False) -> dict[str, ConnectorMe
new_cache: dict[str, ConnectorMetadata] = {}

for connector in data["sources"]:
name = connector["dockerRepository"].replace("airbyte/", "")
new_cache[name] = ConnectorMetadata(name, connector["dockerImageTag"])
connector_metadata = _registry_entry_to_connector_metadata(connector)
new_cache[connector_metadata.name] = connector_metadata

if len(new_cache) == 0:
raise exc.AirbyteLibInternalError(
Expand Down
3 changes: 3 additions & 0 deletions airbyte-lib/airbyte_lib/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def validate(connector_dir: str, sample_config: str, *, validate_install_only: b
{
"dockerRepository": f"airbyte/{connector_name}",
"dockerImageTag": "0.0.1",
"remoteRegistries": {
"pypi": {"packageName": "airbyte-{connector_name}", "enabled": True}
},
},
],
}
Expand Down
22 changes: 22 additions & 0 deletions airbyte-lib/examples/run_pokeapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""A simple test of AirbyteLib, using the PokeAPI source connector.
Usage (from airbyte-lib root directory):
> poetry run python ./examples/run_pokeapi.py
No setup is needed, but you may need to delete the .venv-source-pokeapi folder
if your installation gets interrupted or corrupted.
"""
from __future__ import annotations

import airbyte_lib as ab


source = ab.get_connector(
"source-pokeapi",
config={"pokemon_name": "bulbasaur"},
install_if_missing=True,
)
source.check()

print(list(source.get_records("pokemon")))
48 changes: 48 additions & 0 deletions airbyte-lib/tests/integration_tests/fixtures/registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
"icon": "test.svg",
"iconUrl": "https://connectors.airbyte.com/files/metadata/airbyte/source-test/latest/icon.svg",
"sourceType": "api",
"remoteRegistries": {
"pypi": {
"packageName": "airbyte-source-test",
"enabled": true
}
},
"spec": {
"documentationUrl": "https://docs.airbyte.com/integrations/sources/test",
"connectionSpecification": {
Expand All @@ -35,6 +41,48 @@
"tags": ["language:python"],
"githubIssueLabel": "source-test",
"license": "MIT"
},
{
"sourceDefinitionId": "9f32dab3-77cb-45a1-9d33-347aa5fbe333",
"name": "Non-published source",
"dockerRepository": "airbyte/source-non-published",
"dockerImageTag": "0.0.1",
"documentationUrl": "https://docs.airbyte.com/integrations/sources/test",
"icon": "test.svg",
"iconUrl": "https://connectors.airbyte.com/files/metadata/airbyte/source-test/latest/icon.svg",
"sourceType": "api",
"remoteRegistries": {
"pypi": {
"packageName": "airbyte-source-non-published",
"enabled": false
}
},
"spec": {
"documentationUrl": "https://docs.airbyte.com/integrations/sources/test",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"apiKey": {
"type": "string",
"title": "API Key",
"description": "The API key for the service"
}
}
}
},
"tombstone": false,
"public": true,
"custom": false,
"releaseStage": "alpha",
"supportLevel": "community",
"ab_internal": {
"sl": 100,
"ql": 200
},
"tags": ["language:python"],
"githubIssueLabel": "source-source-non-published",
"license": "MIT"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ data:
releaseStage: alpha
supportLevel: community
documentationUrl: https://docs.airbyte.com/integrations/sources/apify-dataset
remoteRegistries:
pypi:
enabled: true
packageName: airbyte-source-broken
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ data:
releaseStage: alpha
supportLevel: community
documentationUrl: https://docs.airbyte.com/integrations/sources/apify-dataset
remoteRegistries:
pypi:
enabled: true
packageName: airbyte-source-test
metadataSpecVersion: "1.0"
5 changes: 4 additions & 1 deletion airbyte-lib/tests/integration_tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ def test_non_existing_connector():
with pytest.raises(Exception):
ab.get_connector("source-not-existing", config={"apiKey": "abc"})

def test_non_enabled_connector():
with pytest.raises(exc.AirbyteConnectorNotPyPiPublishedError):
ab.get_connector("source-non-published", config={"apiKey": "abc"})

@pytest.mark.parametrize(
"latest_available_version, requested_version, raises",
Expand All @@ -138,7 +141,7 @@ def test_version_enforcement(raises, latest_available_version, requested_version
In this test, the actually installed version is 0.0.1
"""
patched_entry = registry.ConnectorMetadata(
name="source-test", latest_available_version=latest_available_version
name="source-test", latest_available_version=latest_available_version, pypi_package_name="airbyte-source-test"
)
with patch.dict("airbyte_lib.registry.__cache", {"source-test": patched_entry}, clear=False):
if raises:
Expand Down

0 comments on commit f95e0c8

Please sign in to comment.