Skip to content

Commit

Permalink
AirbyteLib: Formalized Exception Handling (#34488)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers committed Jan 24, 2024
1 parent c42c5e6 commit 1268c37
Show file tree
Hide file tree
Showing 28 changed files with 468 additions and 93 deletions.
1 change: 1 addition & 0 deletions airbyte-lib/airbyte_lib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""AirbyteLib brings Airbyte ELT to every Python developer."""
from __future__ import annotations

from airbyte_lib._factories.cache_factories import get_default_cache, new_local_cache
from airbyte_lib._factories.connector_factories import get_connector
Expand Down
64 changes: 45 additions & 19 deletions airbyte-lib/airbyte_lib/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, NoReturn

from airbyte_lib import exceptions as exc
from airbyte_lib.telemetry import SourceTelemetryInfo, SourceType


Expand Down Expand Up @@ -71,7 +72,13 @@ def _stream_from_file(file: IO[str]) -> Generator[str, Any, None]:
yield line

if process.stdout is None:
raise Exception("Failed to start subprocess")
raise exc.AirbyteSubprocessError(
message="Subprocess did not return a stdout stream.",
context={
"args": args,
"returncode": process.returncode,
},
)
try:
yield _stream_from_file(process.stdout)
finally:
Expand All @@ -94,7 +101,7 @@ def _stream_from_file(file: IO[str]) -> Generator[str, Any, None]:

# If the exit code is not 0 or -15 (SIGTERM), raise an exception
if exit_code not in (0, -15):
raise Exception(f"Process exited with code {exit_code}")
raise exc.AirbyteSubprocessFailedError(exit_code=exit_code)


class VenvExecutor(Executor):
Expand Down Expand Up @@ -123,7 +130,9 @@ def _get_connector_path(self) -> Path:
def _run_subprocess_and_raise_on_failure(self, args: list[str]) -> None:
result = subprocess.run(args, check=False)
if result.returncode != 0:
raise Exception(f"Install process exited with code {result.returncode}")
raise exc.AirbyteConnectorInstallationError from exc.AirbyteSubprocessFailedError(
exit_code=result.returncode
)

def uninstall(self) -> None:
venv_name = self._get_venv_name()
Expand Down Expand Up @@ -171,18 +180,27 @@ def ensure_installation(
venv_path = Path(venv_name)
if not venv_path.exists():
if not self.install_if_missing:
raise Exception(
f"Connector {self.metadata.name} is not available - "
f"venv {venv_name} does not exist"
raise exc.AirbyteConnectorNotFoundError(
message="Connector not available and venv does not exist.",
guidance=(
"Please ensure the connector is pre-installed or consider enabling "
"`install_if_missing=True`."
),
context={
"connector_name": self.metadata.name,
"venv_name": venv_name,
},
)
self.install()

connector_path = self._get_connector_path()
if not connector_path.exists():
raise FileNotFoundError(
f"Could not find connector '{self.metadata.name}' in venv '{venv_name}' with "
f"connector path '{connector_path}'.",
)
raise exc.AirbyteConnectorNotFoundError(
connector_name=self.metadata.name,
context={
"venv_name": venv_name,
},
) from FileNotFoundError(connector_path)

if self.enforce_version:
installed_version = self._get_installed_version()
Expand All @@ -193,9 +211,14 @@ def ensure_installation(
# Check the version again
version_after_install = self._get_installed_version()
if version_after_install != self.target_version:
raise Exception(
f"Failed to install connector {self.metadata.name} version "
f"{self.target_version}. Installed version is {version_after_install}",
raise exc.AirbyteConnectorInstallationError(
connector_name=self.metadata.name,
context={
"venv_name": venv_name,
"target_version": self.target_version,
"installed_version": installed_version,
"version_after_install": version_after_install,
},
)

def execute(self, args: list[str]) -> Iterator[str]:
Expand All @@ -213,17 +236,20 @@ def ensure_installation(self) -> None:
try:
self.execute(["spec"])
except Exception as e:
raise Exception(
f"Connector {self.metadata.name} is not available - executing it failed"
raise exc.AirbyteConnectorNotFoundError(
connector_name=self.metadata.name,
) from e

def install(self) -> NoReturn:
raise Exception(f"Connector {self.metadata.name} is not available - cannot install it")
raise exc.AirbyteConnectorInstallationError(
message="Connector cannot be installed because it is not managed by airbyte-lib.",
connector_name=self.metadata.name,
)

def uninstall(self) -> NoReturn:
raise Exception(
f"Connector {self.metadata.name} is installed manually and not managed by airbyte-lib -"
" please remove it manually"
raise exc.AirbyteConnectorInstallationError(
message="Connector cannot be uninstalled because it is not managed by airbyte-lib.",
connector_name=self.metadata.name,
)

def execute(self, args: list[str]) -> Iterator[str]:
Expand Down
12 changes: 8 additions & 4 deletions airbyte-lib/airbyte_lib/_factories/cache_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import ulid

from airbyte_lib import exceptions as exc
from airbyte_lib.caches.duckdb import DuckDBCache, DuckDBCacheConfig


Expand Down Expand Up @@ -38,12 +39,15 @@ def new_local_cache(
"""
if cache_name:
if " " in cache_name:
raise ValueError(f"Cache name '{cache_name}' cannot contain spaces")
raise exc.AirbyteLibInputError(
message="Cache name cannot contain spaces.",
input_value=cache_name,
)

if not cache_name.replace("_", "").isalnum():
raise ValueError(
f"Cache name '{cache_name}' can only contain alphanumeric "
"characters and underscores."
raise exc.AirbyteLibInputError(
message="Cache name can only contain alphanumeric characters and underscores.",
input_value=cache_name,
)

cache_name = cache_name or str(ulid.ULID())
Expand Down
9 changes: 7 additions & 2 deletions airbyte-lib/airbyte_lib/_factories/connector_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any

from airbyte_lib._executor import Executor, PathExecutor, VenvExecutor
from airbyte_lib.exceptions import AirbyteLibInputError
from airbyte_lib.registry import get_connector_metadata
from airbyte_lib.source import Source

Expand Down Expand Up @@ -37,9 +38,13 @@ def get_connector(
metadata = get_connector_metadata(name)
if use_local_install:
if pip_url:
raise ValueError("Param 'pip_url' is not supported when 'use_local_install' is True")
raise AirbyteLibInputError(
message="Param 'pip_url' is not supported when 'use_local_install' is True."
)
if version:
raise ValueError("Param 'version' is not supported when 'use_local_install' is True")
raise AirbyteLibInputError(
message="Param 'version' is not supported when 'use_local_install' is True."
)
executor: Executor = PathExecutor(
metadata=metadata,
target_version=version,
Expand Down
2 changes: 2 additions & 0 deletions airbyte-lib/airbyte_lib/_file_writers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from .base import FileWriterBase, FileWriterBatchHandle, FileWriterConfigBase
from .parquet import ParquetWriter, ParquetWriterConfig

Expand Down
8 changes: 7 additions & 1 deletion airbyte-lib/airbyte_lib/_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
Type,
)

from airbyte_lib import exceptions as exc
from airbyte_lib._util import protocol_util # Internal utility functions


Expand Down Expand Up @@ -171,7 +172,12 @@ def process_airbyte_messages(
pass

else:
raise ValueError(f"Unexpected message type: {message.type}")
raise exc.AirbyteConnectorError(
message="Unexpected message type.",
context={
"message_type": message.type,
},
)

# We are at the end of the stream. Process whatever else is queued.
for stream_name, batch in stream_batches.items():
Expand Down
8 changes: 7 additions & 1 deletion airbyte-lib/airbyte_lib/_util/protocol_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
Type,
)

from airbyte_lib import exceptions as exc


if TYPE_CHECKING:
from collections.abc import Iterable, Iterator
Expand Down Expand Up @@ -66,6 +68,10 @@ def get_primary_keys_from_stream(
None,
)
if stream is None:
raise KeyError(f"Stream {stream_name} not found in catalog.")
raise exc.AirbyteStreamNotFoundError(
stream_name=stream_name,
connector_name=configured_catalog.connection.configuration["name"],
available_streams=[stream.stream.name for stream in configured_catalog.streams],
)

return set(stream.stream.source_defined_primary_key or [])
1 change: 1 addition & 0 deletions airbyte-lib/airbyte_lib/caches/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Base module for all caches."""
from __future__ import annotations

from airbyte_lib.caches.base import SQLCacheBase
from airbyte_lib.caches.duckdb import DuckDBCache, DuckDBCacheConfig
Expand Down
44 changes: 31 additions & 13 deletions airbyte-lib/airbyte_lib/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from sqlalchemy.pool import StaticPool
from sqlalchemy.sql.elements import TextClause

from airbyte_lib import exceptions as exc
from airbyte_lib._file_writers.base import FileWriterBase, FileWriterBatchHandle
from airbyte_lib._processors import BatchHandle, RecordProcessor
from airbyte_lib.config import CacheConfigBase
Expand Down Expand Up @@ -374,8 +375,11 @@ def _ensure_compatible_table_schema(
missing_columns: set[str] = set(stream_column_names) - set(table_column_names)
if missing_columns:
if raise_on_error:
raise RuntimeError(
f"Table {table_name} is missing columns: {missing_columns}",
raise exc.AirbyteLibCacheTableValidationError(
violation="Cache table is missing expected columns.",
context={
"missing_columns": missing_columns,
},
)
return False # Some columns are missing.

Expand Down Expand Up @@ -440,16 +444,25 @@ def _get_stream_config(
) -> ConfiguredAirbyteStream:
"""Return the column definitions for the given stream."""
if not self.source_catalog:
raise RuntimeError("Cannot get stream JSON schema without a catalog.")
raise exc.AirbyteLibInternalError(
message="Cannot get stream JSON schema without a catalog.",
)

matching_streams: list[ConfiguredAirbyteStream] = [
stream for stream in self.source_catalog.streams if stream.stream.name == stream_name
]
if not matching_streams:
raise RuntimeError(f"Stream '{stream_name}' not found in catalog.")
raise exc.AirbyteStreamNotFoundError(
stream_name=stream_name,
)

if len(matching_streams) > 1:
raise RuntimeError(f"Multiple streams found with name '{stream_name}'.")
raise exc.AirbyteLibInternalError(
message="Multiple streams found with same name.",
context={
"stream_name": stream_name,
},
)

return matching_streams[0]

Expand Down Expand Up @@ -525,12 +538,12 @@ def _finalize_batches(self, stream_name: str) -> dict[str, BatchHandle]:
raise_on_error=True,
)

temp_table_name = self._write_files_to_new_table(
files,
stream_name,
max_batch_id,
)
try:
temp_table_name = self._write_files_to_new_table(
files,
stream_name,
max_batch_id,
)
self._write_temp_table_to_final_table(
stream_name,
temp_table_name,
Expand Down Expand Up @@ -592,7 +605,12 @@ def _write_files_to_new_table(

# Pandas will auto-create the table if it doesn't exist, which we don't want.
if not self._table_exists(temp_table_name):
raise RuntimeError(f"Table {temp_table_name} does not exist after creation.")
raise exc.AirbyteLibInternalError(
message="Table does not exist after creation.",
context={
"temp_table_name": temp_table_name,
},
)

dataframe.to_sql(
temp_table_name,
Expand Down Expand Up @@ -685,9 +703,9 @@ def _swap_temp_table_with_final_table(
Databases that do not support this syntax can override this method.
"""
if final_table_name is None:
raise ValueError("Arg 'final_table_name' cannot be None.")
raise exc.AirbyteLibInternalError(message="Arg 'final_table_name' cannot be None.")
if temp_table_name is None:
raise ValueError("Arg 'temp_table_name' cannot be None.")
raise exc.AirbyteLibInternalError(message="Arg 'temp_table_name' cannot be None.")

_ = stream_name
deletion_name = f"{final_table_name}_deleteme"
Expand Down
21 changes: 14 additions & 7 deletions airbyte-lib/airbyte_lib/caches/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from overrides import overrides

from airbyte_lib import exceptions as exc
from airbyte_lib._file_writers import ParquetWriter, ParquetWriterConfig
from airbyte_lib.caches.base import SQLCacheBase, SQLCacheConfigBase
from airbyte_lib.telemetry import CacheTelemetryInfo
Expand Down Expand Up @@ -93,9 +94,11 @@ def _merge_temp_table_to_final_table(
Databases that do not support this syntax can override this method.
"""
if not self._get_primary_keys(stream_name):
raise RuntimeError(
f"Primary keys not found for stream {stream_name}. "
"Cannot run merge updates without primary keys."
raise exc.AirbyteLibInternalError(
message="Primary keys not found. Cannot run merge updates without primary keys.",
context={
"stream_name": stream_name,
},
)

_ = stream_name
Expand Down Expand Up @@ -135,10 +138,14 @@ def _ensure_compatible_table_schema(
table_pk_cols = table.primary_key.columns.keys()
if set(pk_cols) != set(table_pk_cols):
if raise_on_error:
raise RuntimeError(
f"Primary keys do not match for table {table_name}. "
f"Expected: {pk_cols}. "
f"Found: {table_pk_cols}.",
raise exc.AirbyteLibCacheTableValidationError(
violation="Primary keys do not match.",
context={
"stream_name": stream_name,
"table_name": table_name,
"expected": pk_cols,
"found": table_pk_cols,
},
)
return False

Expand Down
2 changes: 2 additions & 0 deletions airbyte-lib/airbyte_lib/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from airbyte_lib.datasets._base import DatasetBase
from airbyte_lib.datasets._lazy import LazyDataset
from airbyte_lib.datasets._map import DatasetMap
Expand Down
1 change: 1 addition & 0 deletions airbyte-lib/airbyte_lib/datasets/_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Iterator, Mapping
Expand Down

0 comments on commit 1268c37

Please sign in to comment.