From 1268c3740b565aa773cf1643fbc98abc4d63c4a4 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Wed, 24 Jan 2024 13:44:35 -0800 Subject: [PATCH] AirbyteLib: Formalized Exception Handling (#34488) --- airbyte-lib/airbyte_lib/__init__.py | 1 + airbyte-lib/airbyte_lib/_executor.py | 64 +++-- .../airbyte_lib/_factories/cache_factories.py | 12 +- .../_factories/connector_factories.py | 9 +- .../airbyte_lib/_file_writers/__init__.py | 2 + airbyte-lib/airbyte_lib/_processors.py | 8 +- .../airbyte_lib/_util/protocol_util.py | 8 +- airbyte-lib/airbyte_lib/caches/__init__.py | 1 + airbyte-lib/airbyte_lib/caches/base.py | 44 +++- airbyte-lib/airbyte_lib/caches/duckdb.py | 21 +- airbyte-lib/airbyte_lib/datasets/__init__.py | 2 + airbyte-lib/airbyte_lib/datasets/_base.py | 1 + airbyte-lib/airbyte_lib/datasets/_map.py | 6 +- airbyte-lib/airbyte_lib/exceptions.py | 227 ++++++++++++++++++ airbyte-lib/airbyte_lib/registry.py | 9 +- airbyte-lib/airbyte_lib/results.py | 16 +- airbyte-lib/airbyte_lib/source.py | 46 ++-- airbyte-lib/airbyte_lib/validate.py | 15 +- airbyte-lib/airbyte_lib/version.py | 1 + airbyte-lib/docs.py | 1 + airbyte-lib/examples/run_snowflake_faker.py | 2 +- airbyte-lib/examples/run_spacex.py | 1 + airbyte-lib/examples/run_test_source.py | 1 + .../examples/run_test_source_single_stream.py | 1 + airbyte-lib/poetry.lock | 20 +- airbyte-lib/pyproject.toml | 9 +- .../integration_tests/test_integration.py | 5 +- .../tests/unit_tests/test_exceptions.py | 28 +++ 28 files changed, 468 insertions(+), 93 deletions(-) create mode 100644 airbyte-lib/airbyte_lib/exceptions.py create mode 100644 airbyte-lib/tests/unit_tests/test_exceptions.py diff --git a/airbyte-lib/airbyte_lib/__init__.py b/airbyte-lib/airbyte_lib/__init__.py index 0e98616b0443b..86ffb5e0fa401 100644 --- a/airbyte-lib/airbyte_lib/__init__.py +++ b/airbyte-lib/airbyte_lib/__init__.py @@ -1,4 +1,5 @@ """AirbyteLib brings Airbyte ELT to every Python developer.""" +from __future__ import annotations from airbyte_lib._factories.cache_factories import get_default_cache, new_local_cache from airbyte_lib._factories.connector_factories import get_connector diff --git a/airbyte-lib/airbyte_lib/_executor.py b/airbyte-lib/airbyte_lib/_executor.py index 1a816cc46848f..20899f892006c 100644 --- a/airbyte-lib/airbyte_lib/_executor.py +++ b/airbyte-lib/airbyte_lib/_executor.py @@ -8,6 +8,7 @@ from pathlib import Path from typing import IO, TYPE_CHECKING, Any, NoReturn +from airbyte_lib import exceptions as exc from airbyte_lib.telemetry import SourceTelemetryInfo, SourceType @@ -71,7 +72,13 @@ def _stream_from_file(file: IO[str]) -> Generator[str, Any, None]: yield line if process.stdout is None: - raise Exception("Failed to start subprocess") + raise exc.AirbyteSubprocessError( + message="Subprocess did not return a stdout stream.", + context={ + "args": args, + "returncode": process.returncode, + }, + ) try: yield _stream_from_file(process.stdout) finally: @@ -94,7 +101,7 @@ def _stream_from_file(file: IO[str]) -> Generator[str, Any, None]: # If the exit code is not 0 or -15 (SIGTERM), raise an exception if exit_code not in (0, -15): - raise Exception(f"Process exited with code {exit_code}") + raise exc.AirbyteSubprocessFailedError(exit_code=exit_code) class VenvExecutor(Executor): @@ -123,7 +130,9 @@ def _get_connector_path(self) -> Path: def _run_subprocess_and_raise_on_failure(self, args: list[str]) -> None: result = subprocess.run(args, check=False) if result.returncode != 0: - raise Exception(f"Install process exited with code {result.returncode}") + raise exc.AirbyteConnectorInstallationError from exc.AirbyteSubprocessFailedError( + exit_code=result.returncode + ) def uninstall(self) -> None: venv_name = self._get_venv_name() @@ -171,18 +180,27 @@ def ensure_installation( venv_path = Path(venv_name) if not venv_path.exists(): if not self.install_if_missing: - raise Exception( - f"Connector {self.metadata.name} is not available - " - f"venv {venv_name} does not exist" + raise exc.AirbyteConnectorNotFoundError( + message="Connector not available and venv does not exist.", + guidance=( + "Please ensure the connector is pre-installed or consider enabling " + "`install_if_missing=True`." + ), + context={ + "connector_name": self.metadata.name, + "venv_name": venv_name, + }, ) self.install() connector_path = self._get_connector_path() if not connector_path.exists(): - raise FileNotFoundError( - f"Could not find connector '{self.metadata.name}' in venv '{venv_name}' with " - f"connector path '{connector_path}'.", - ) + raise exc.AirbyteConnectorNotFoundError( + connector_name=self.metadata.name, + context={ + "venv_name": venv_name, + }, + ) from FileNotFoundError(connector_path) if self.enforce_version: installed_version = self._get_installed_version() @@ -193,9 +211,14 @@ def ensure_installation( # Check the version again version_after_install = self._get_installed_version() if version_after_install != self.target_version: - raise Exception( - f"Failed to install connector {self.metadata.name} version " - f"{self.target_version}. Installed version is {version_after_install}", + raise exc.AirbyteConnectorInstallationError( + connector_name=self.metadata.name, + context={ + "venv_name": venv_name, + "target_version": self.target_version, + "installed_version": installed_version, + "version_after_install": version_after_install, + }, ) def execute(self, args: list[str]) -> Iterator[str]: @@ -213,17 +236,20 @@ def ensure_installation(self) -> None: try: self.execute(["spec"]) except Exception as e: - raise Exception( - f"Connector {self.metadata.name} is not available - executing it failed" + raise exc.AirbyteConnectorNotFoundError( + connector_name=self.metadata.name, ) from e def install(self) -> NoReturn: - raise Exception(f"Connector {self.metadata.name} is not available - cannot install it") + raise exc.AirbyteConnectorInstallationError( + message="Connector cannot be installed because it is not managed by airbyte-lib.", + connector_name=self.metadata.name, + ) def uninstall(self) -> NoReturn: - raise Exception( - f"Connector {self.metadata.name} is installed manually and not managed by airbyte-lib -" - " please remove it manually" + raise exc.AirbyteConnectorInstallationError( + message="Connector cannot be uninstalled because it is not managed by airbyte-lib.", + connector_name=self.metadata.name, ) def execute(self, args: list[str]) -> Iterator[str]: diff --git a/airbyte-lib/airbyte_lib/_factories/cache_factories.py b/airbyte-lib/airbyte_lib/_factories/cache_factories.py index 5a95dce2db7b4..82ad3241920cc 100644 --- a/airbyte-lib/airbyte_lib/_factories/cache_factories.py +++ b/airbyte-lib/airbyte_lib/_factories/cache_factories.py @@ -5,6 +5,7 @@ import ulid +from airbyte_lib import exceptions as exc from airbyte_lib.caches.duckdb import DuckDBCache, DuckDBCacheConfig @@ -38,12 +39,15 @@ def new_local_cache( """ if cache_name: if " " in cache_name: - raise ValueError(f"Cache name '{cache_name}' cannot contain spaces") + raise exc.AirbyteLibInputError( + message="Cache name cannot contain spaces.", + input_value=cache_name, + ) if not cache_name.replace("_", "").isalnum(): - raise ValueError( - f"Cache name '{cache_name}' can only contain alphanumeric " - "characters and underscores." + raise exc.AirbyteLibInputError( + message="Cache name can only contain alphanumeric characters and underscores.", + input_value=cache_name, ) cache_name = cache_name or str(ulid.ULID()) diff --git a/airbyte-lib/airbyte_lib/_factories/connector_factories.py b/airbyte-lib/airbyte_lib/_factories/connector_factories.py index 347710f20824a..4dbe8c6f41f06 100644 --- a/airbyte-lib/airbyte_lib/_factories/connector_factories.py +++ b/airbyte-lib/airbyte_lib/_factories/connector_factories.py @@ -4,6 +4,7 @@ from typing import Any from airbyte_lib._executor import Executor, PathExecutor, VenvExecutor +from airbyte_lib.exceptions import AirbyteLibInputError from airbyte_lib.registry import get_connector_metadata from airbyte_lib.source import Source @@ -37,9 +38,13 @@ def get_connector( metadata = get_connector_metadata(name) if use_local_install: if pip_url: - raise ValueError("Param 'pip_url' is not supported when 'use_local_install' is True") + raise AirbyteLibInputError( + message="Param 'pip_url' is not supported when 'use_local_install' is True." + ) if version: - raise ValueError("Param 'version' is not supported when 'use_local_install' is True") + raise AirbyteLibInputError( + message="Param 'version' is not supported when 'use_local_install' is True." + ) executor: Executor = PathExecutor( metadata=metadata, target_version=version, diff --git a/airbyte-lib/airbyte_lib/_file_writers/__init__.py b/airbyte-lib/airbyte_lib/_file_writers/__init__.py index 007dde8324345..aae8c474ca97f 100644 --- a/airbyte-lib/airbyte_lib/_file_writers/__init__.py +++ b/airbyte-lib/airbyte_lib/_file_writers/__init__.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from .base import FileWriterBase, FileWriterBatchHandle, FileWriterConfigBase from .parquet import ParquetWriter, ParquetWriterConfig diff --git a/airbyte-lib/airbyte_lib/_processors.py b/airbyte-lib/airbyte_lib/_processors.py index 2fc92a392bfc5..ee94181660be6 100644 --- a/airbyte-lib/airbyte_lib/_processors.py +++ b/airbyte-lib/airbyte_lib/_processors.py @@ -30,6 +30,7 @@ Type, ) +from airbyte_lib import exceptions as exc from airbyte_lib._util import protocol_util # Internal utility functions @@ -171,7 +172,12 @@ def process_airbyte_messages( pass else: - raise ValueError(f"Unexpected message type: {message.type}") + raise exc.AirbyteConnectorError( + message="Unexpected message type.", + context={ + "message_type": message.type, + }, + ) # We are at the end of the stream. Process whatever else is queued. for stream_name, batch in stream_batches.items(): diff --git a/airbyte-lib/airbyte_lib/_util/protocol_util.py b/airbyte-lib/airbyte_lib/_util/protocol_util.py index f4e5ab3fb293b..638d25d04d64b 100644 --- a/airbyte-lib/airbyte_lib/_util/protocol_util.py +++ b/airbyte-lib/airbyte_lib/_util/protocol_util.py @@ -12,6 +12,8 @@ Type, ) +from airbyte_lib import exceptions as exc + if TYPE_CHECKING: from collections.abc import Iterable, Iterator @@ -66,6 +68,10 @@ def get_primary_keys_from_stream( None, ) if stream is None: - raise KeyError(f"Stream {stream_name} not found in catalog.") + raise exc.AirbyteStreamNotFoundError( + stream_name=stream_name, + connector_name=configured_catalog.connection.configuration["name"], + available_streams=[stream.stream.name for stream in configured_catalog.streams], + ) return set(stream.stream.source_defined_primary_key or []) diff --git a/airbyte-lib/airbyte_lib/caches/__init__.py b/airbyte-lib/airbyte_lib/caches/__init__.py index d14980332458a..3cb5c31cf1192 100644 --- a/airbyte-lib/airbyte_lib/caches/__init__.py +++ b/airbyte-lib/airbyte_lib/caches/__init__.py @@ -1,4 +1,5 @@ """Base module for all caches.""" +from __future__ import annotations from airbyte_lib.caches.base import SQLCacheBase from airbyte_lib.caches.duckdb import DuckDBCache, DuckDBCacheConfig diff --git a/airbyte-lib/airbyte_lib/caches/base.py b/airbyte-lib/airbyte_lib/caches/base.py index 53415da762487..23df721400b85 100644 --- a/airbyte-lib/airbyte_lib/caches/base.py +++ b/airbyte-lib/airbyte_lib/caches/base.py @@ -18,6 +18,7 @@ from sqlalchemy.pool import StaticPool from sqlalchemy.sql.elements import TextClause +from airbyte_lib import exceptions as exc from airbyte_lib._file_writers.base import FileWriterBase, FileWriterBatchHandle from airbyte_lib._processors import BatchHandle, RecordProcessor from airbyte_lib.config import CacheConfigBase @@ -374,8 +375,11 @@ def _ensure_compatible_table_schema( missing_columns: set[str] = set(stream_column_names) - set(table_column_names) if missing_columns: if raise_on_error: - raise RuntimeError( - f"Table {table_name} is missing columns: {missing_columns}", + raise exc.AirbyteLibCacheTableValidationError( + violation="Cache table is missing expected columns.", + context={ + "missing_columns": missing_columns, + }, ) return False # Some columns are missing. @@ -440,16 +444,25 @@ def _get_stream_config( ) -> ConfiguredAirbyteStream: """Return the column definitions for the given stream.""" if not self.source_catalog: - raise RuntimeError("Cannot get stream JSON schema without a catalog.") + raise exc.AirbyteLibInternalError( + message="Cannot get stream JSON schema without a catalog.", + ) matching_streams: list[ConfiguredAirbyteStream] = [ stream for stream in self.source_catalog.streams if stream.stream.name == stream_name ] if not matching_streams: - raise RuntimeError(f"Stream '{stream_name}' not found in catalog.") + raise exc.AirbyteStreamNotFoundError( + stream_name=stream_name, + ) if len(matching_streams) > 1: - raise RuntimeError(f"Multiple streams found with name '{stream_name}'.") + raise exc.AirbyteLibInternalError( + message="Multiple streams found with same name.", + context={ + "stream_name": stream_name, + }, + ) return matching_streams[0] @@ -525,12 +538,12 @@ def _finalize_batches(self, stream_name: str) -> dict[str, BatchHandle]: raise_on_error=True, ) + temp_table_name = self._write_files_to_new_table( + files, + stream_name, + max_batch_id, + ) try: - temp_table_name = self._write_files_to_new_table( - files, - stream_name, - max_batch_id, - ) self._write_temp_table_to_final_table( stream_name, temp_table_name, @@ -592,7 +605,12 @@ def _write_files_to_new_table( # Pandas will auto-create the table if it doesn't exist, which we don't want. if not self._table_exists(temp_table_name): - raise RuntimeError(f"Table {temp_table_name} does not exist after creation.") + raise exc.AirbyteLibInternalError( + message="Table does not exist after creation.", + context={ + "temp_table_name": temp_table_name, + }, + ) dataframe.to_sql( temp_table_name, @@ -685,9 +703,9 @@ def _swap_temp_table_with_final_table( Databases that do not support this syntax can override this method. """ if final_table_name is None: - raise ValueError("Arg 'final_table_name' cannot be None.") + raise exc.AirbyteLibInternalError(message="Arg 'final_table_name' cannot be None.") if temp_table_name is None: - raise ValueError("Arg 'temp_table_name' cannot be None.") + raise exc.AirbyteLibInternalError(message="Arg 'temp_table_name' cannot be None.") _ = stream_name deletion_name = f"{final_table_name}_deleteme" diff --git a/airbyte-lib/airbyte_lib/caches/duckdb.py b/airbyte-lib/airbyte_lib/caches/duckdb.py index 0d6ba6efe38a9..ac70bcf8648ad 100644 --- a/airbyte-lib/airbyte_lib/caches/duckdb.py +++ b/airbyte-lib/airbyte_lib/caches/duckdb.py @@ -9,6 +9,7 @@ from overrides import overrides +from airbyte_lib import exceptions as exc from airbyte_lib._file_writers import ParquetWriter, ParquetWriterConfig from airbyte_lib.caches.base import SQLCacheBase, SQLCacheConfigBase from airbyte_lib.telemetry import CacheTelemetryInfo @@ -93,9 +94,11 @@ def _merge_temp_table_to_final_table( Databases that do not support this syntax can override this method. """ if not self._get_primary_keys(stream_name): - raise RuntimeError( - f"Primary keys not found for stream {stream_name}. " - "Cannot run merge updates without primary keys." + raise exc.AirbyteLibInternalError( + message="Primary keys not found. Cannot run merge updates without primary keys.", + context={ + "stream_name": stream_name, + }, ) _ = stream_name @@ -135,10 +138,14 @@ def _ensure_compatible_table_schema( table_pk_cols = table.primary_key.columns.keys() if set(pk_cols) != set(table_pk_cols): if raise_on_error: - raise RuntimeError( - f"Primary keys do not match for table {table_name}. " - f"Expected: {pk_cols}. " - f"Found: {table_pk_cols}.", + raise exc.AirbyteLibCacheTableValidationError( + violation="Primary keys do not match.", + context={ + "stream_name": stream_name, + "table_name": table_name, + "expected": pk_cols, + "found": table_pk_cols, + }, ) return False diff --git a/airbyte-lib/airbyte_lib/datasets/__init__.py b/airbyte-lib/airbyte_lib/datasets/__init__.py index d1c8d3a6d0eac..bfd4f02ce319a 100644 --- a/airbyte-lib/airbyte_lib/datasets/__init__.py +++ b/airbyte-lib/airbyte_lib/datasets/__init__.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from airbyte_lib.datasets._base import DatasetBase from airbyte_lib.datasets._lazy import LazyDataset from airbyte_lib.datasets._map import DatasetMap diff --git a/airbyte-lib/airbyte_lib/datasets/_base.py b/airbyte-lib/airbyte_lib/datasets/_base.py index b9f888c2fe811..f0fdfab52b912 100644 --- a/airbyte-lib/airbyte_lib/datasets/_base.py +++ b/airbyte-lib/airbyte_lib/datasets/_base.py @@ -1,4 +1,5 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. +from __future__ import annotations from abc import ABC, abstractmethod from collections.abc import Iterator, Mapping diff --git a/airbyte-lib/airbyte_lib/datasets/_map.py b/airbyte-lib/airbyte_lib/datasets/_map.py index 3881e1d33da84..42eaed88f0e3e 100644 --- a/airbyte-lib/airbyte_lib/datasets/_map.py +++ b/airbyte-lib/airbyte_lib/datasets/_map.py @@ -5,10 +5,14 @@ TODO: This is a work in progress. It is not yet used by any other code. TODO: Implement before release, or delete. """ +from __future__ import annotations from collections.abc import Iterator, Mapping +from typing import TYPE_CHECKING -from airbyte_lib.datasets._base import DatasetBase + +if TYPE_CHECKING: + from airbyte_lib.datasets._base import DatasetBase class DatasetMap(Mapping): diff --git a/airbyte-lib/airbyte_lib/exceptions.py b/airbyte-lib/airbyte_lib/exceptions.py new file mode 100644 index 0000000000000..3c6336d031033 --- /dev/null +++ b/airbyte-lib/airbyte_lib/exceptions.py @@ -0,0 +1,227 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +"""All exceptions used in the Airbyte Lib. + +This design is modeled after structlog's exceptions, in that we bias towards auto-generated +property prints rather than sentence-like string concatenation. + +E.g. Instead of this: +> Subprocess failed with exit code '1' + +We do this: +> Subprocess failed. (exit_code=1) + +The benefit of this approach is that we can easily support structured logging, and we can +easily add new properties to exceptions without having to update all the places where they +are raised. We can also support any arbitrary number of properties in exceptions, without spending +time on building sentence-like string constructions with optional inputs. + + +In addition, the following principles are applied for exception class design: + +- All exceptions inherit from a common base class. +- All exceptions have a message attribute. +- The first line of the docstring is used as the default message. +- The default message can be overridden by explicitly setting the message attribute. +- Exceptions may optionally have a guidance attribute. +- Exceptions may optionally have a help_url attribute. +- Rendering is automatically handled by the base class. +- Any helpful context not defined by the exception class can be passed in the `context` dict arg. +- Within reason, avoid sending PII to the exception constructor. +- Exceptions are dataclasses, so they can be instantiated with keyword arguments. +- Use the 'from' syntax to chain exceptions when it is helpful to do so. + E.g. `raise AirbyteConnectorNotFoundError(...) from FileNotFoundError(connector_path)` +- Any exception that adds a new property should also be decorated as `@dataclass`. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + + +NEW_ISSUE_URL = "https://github.com/airbytehq/airbyte/issues/new/choose" +DOCS_URL = "https://docs.airbyte.io/" + + +# Base error class + + +@dataclass +class AirbyteError(Exception): + """Base class for exceptions in Airbyte.""" + + guidance: str | None = None + help_url: str | None = None + log_text: str | list[str] | None = None + context: dict[str, Any] | None = None + message: str | None = None + + def get_message(self) -> str: + """Return the best description for the exception. + + We resolve the following in order: + 1. The message sent to the exception constructor (if provided). + 2. The first line of the class's docstring. + """ + if self.message: + return self.message + + return self.__doc__.split("\n")[0] if self.__doc__ else "" + + def __str__(self) -> str: + special_properties = ["message", "guidance", "help_url", "log_text"] + properties_str = ", ".join( + f"{k}={v!r}" + for k, v in self.__dict__.items() + if k not in special_properties and not k.startswith("_") and v is not None + ) + exception_str = f"{self.__class__.__name__}: {self.get_message()}." + if properties_str: + exception_str += f" ({properties_str})" + + if self.log_text: + if isinstance(self.log_text, list): + self.log_text = "\n".join(self.log_text) + + exception_str += f"\n\n Log output: {self.log_text}" + + if self.guidance: + exception_str += f"\n\n Suggestion: {self.guidance}" + + if self.help_url: + exception_str += f"\n\n More info: {self.help_url}" + + return exception_str + + def __repr__(self) -> str: + class_name = self.__class__.__name__ + properties_str = ", ".join( + f"{k}={v!r}" for k, v in self.__dict__.items() if not k.startswith("_") + ) + return f"{class_name}({properties_str})" + + +# AirbyteLib Internal Errors (these are probably bugs) + + +@dataclass +class AirbyteLibInternalError(AirbyteError): + """An internal error occurred in Airbyte Lib.""" + + guidance = "Please consider reporting this error to the Airbyte team." + help_url = NEW_ISSUE_URL + + +# AirbyteLib Input Errors (replaces ValueError for user input) + + +@dataclass +class AirbyteLibInputError(AirbyteError, ValueError): + """The input provided to AirbyteLib did not match expected validation rules. + + This inherits from ValueError so that it can be used as a drop-in replacement for + ValueError in the Airbyte Lib API. + """ + + # TODO: Consider adding a help_url that links to the auto-generated API reference. + + guidance = "Please check the provided value and try again." + input_value: str | None = None + + +# AirbyteLib Cache Errors + + +class AirbyteLibCacheError(AirbyteError): + """Error occurred while accessing the cache.""" + + +@dataclass +class AirbyteLibCacheTableValidationError(AirbyteLibCacheError): + """Cache table validation failed.""" + + violation: str | None = None + + +@dataclass +class AirbyteConnectorConfigurationMissingError(AirbyteLibCacheError): + """Connector is missing configuration.""" + + connector_name: str | None = None + + +# Subprocess Errors + + +@dataclass +class AirbyteSubprocessError(AirbyteError): + """Error when running subprocess.""" + + run_args: list[str] | None = None + + +@dataclass +class AirbyteSubprocessFailedError(AirbyteSubprocessError): + """Subprocess failed.""" + + exit_code: int | None = None + + +# Connector Registry Errors + + +class AirbyteConnectorRegistryError(AirbyteError): + """Error when accessing the connector registry.""" + + +# Connector Errors + + +@dataclass +class AirbyteConnectorError(AirbyteError): + """Error when running the connector.""" + + connector_name: str | None = None + + +class AirbyteConnectorNotFoundError(AirbyteConnectorError): + """Connector not found.""" + + +class AirbyteConnectorInstallationError(AirbyteConnectorError): + """Error when installing the connector.""" + + +class AirbyteConnectorReadError(AirbyteConnectorError): + """Error when reading from the connector.""" + + +class AirbyteNoDataFromConnectorError(AirbyteConnectorError): + """No data was provided from the connector.""" + + +class AirbyteConnectorMissingCatalogError(AirbyteConnectorError): + """Connector did not return a catalog.""" + + +class AirbyteConnectorMissingSpecError(AirbyteConnectorError): + """Connector did not return a spec.""" + + +class AirbyteConnectorCheckFailedError(AirbyteConnectorError): + """Connector did not return a spec.""" + + +@dataclass +class AirbyteConnectorFailedError(AirbyteConnectorError): + """Connector failed.""" + + exit_code: int | None = None + + +@dataclass +class AirbyteStreamNotFoundError(AirbyteConnectorError): + """Connector stream not found.""" + + stream_name: str | None = None + available_streams: list[str] | None = None diff --git a/airbyte-lib/airbyte_lib/registry.py b/airbyte-lib/airbyte_lib/registry.py index e0afdbaf2c3ac..bd030a867ff00 100644 --- a/airbyte-lib/airbyte_lib/registry.py +++ b/airbyte-lib/airbyte_lib/registry.py @@ -8,6 +8,7 @@ import requests +from airbyte_lib import exceptions as exc from airbyte_lib.version import get_version @@ -47,5 +48,11 @@ def get_connector_metadata(name: str) -> ConnectorMetadata: if not _cache: _update_cache() if not _cache or name not in _cache: - raise Exception(f"Connector {name} not found") + raise exc.AirbyteLibInputError( + message="Connector name not found in registry.", + guidance="Please double check the connector name.", + context={ + "connector_name": name, + }, + ) return _cache[name] diff --git a/airbyte-lib/airbyte_lib/results.py b/airbyte-lib/airbyte_lib/results.py index 25f59a05acf6a..81e67fad289d6 100644 --- a/airbyte-lib/airbyte_lib/results.py +++ b/airbyte-lib/airbyte_lib/results.py @@ -1,13 +1,19 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. +from __future__ import annotations -from collections.abc import Iterator, Mapping +from typing import TYPE_CHECKING -from sqlalchemy.engine import Engine - -from airbyte_lib.caches import SQLCacheBase from airbyte_lib.datasets import CachedDataset +if TYPE_CHECKING: + from collections.abc import Iterator, Mapping + + from sqlalchemy.engine import Engine + + from airbyte_lib.caches import SQLCacheBase + + class ReadResult: def __init__(self, processed_records: int, cache: SQLCacheBase) -> None: self.processed_records = processed_records @@ -15,7 +21,7 @@ def __init__(self, processed_records: int, cache: SQLCacheBase) -> None: def __getitem__(self, stream: str) -> CachedDataset: if stream not in self._cache: - raise KeyError(f"Stream {stream} does not exist") + raise KeyError(stream) return CachedDataset(self._cache, stream) diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py index f4430698ef1fe..91ed050faad10 100644 --- a/airbyte-lib/airbyte_lib/source.py +++ b/airbyte-lib/airbyte_lib/source.py @@ -21,6 +21,7 @@ Type, ) +from airbyte_lib import exceptions as exc from airbyte_lib._factories.cache_factories import get_default_cache from airbyte_lib._util import protocol_util # Internal utility functions from airbyte_lib.datasets._lazy import LazyDataset @@ -94,9 +95,10 @@ def set_streams(self, streams: list[str]) -> None: available_streams = self.get_available_streams() for stream in streams: if stream not in available_streams: - raise Exception( - f"Stream {stream} is not available for connector {self.name}. " - f"Choose from: {available_streams}", + raise exc.AirbyteStreamNotFoundError( + stream_name=stream, + connector_name=self.name, + available_streams=available_streams, ) self._selected_stream_names = streams @@ -107,8 +109,8 @@ def set_config(self, config: dict[str, Any]) -> None: @property def _config(self) -> dict[str, Any]: if self._config_dict is None: - raise Exception( - "Config is not set, either set in get_connector or via source.set_config", + raise exc.AirbyteConnectorConfigurationMissingError( + guidance="Provide via get_connector() or set_config()" ) return self._config_dict @@ -125,8 +127,8 @@ def _discover(self) -> AirbyteCatalog: for msg in self._execute(["discover", "--config", config_file]): if msg.type == Type.CATALOG and msg.catalog: return msg.catalog - raise Exception( - f"Connector did not return a catalog. Last logs: {self._last_log_messages}", + raise exc.AirbyteConnectorMissingCatalogError( + log_text=self._last_log_messages, ) def _validate_config(self, config: dict[str, Any]) -> None: @@ -155,8 +157,8 @@ def _get_spec(self, *, force_refresh: bool = False) -> ConnectorSpecification: if self._spec: return self._spec - raise Exception( - f"Connector did not return a spec. Last logs: {self._last_log_messages}", + raise exc.AirbyteConnectorMissingSpecError( + log_text=self._last_log_messages, ) @property @@ -222,10 +224,14 @@ def get_records(self, stream: str) -> LazyDataset: ], ) if len(configured_catalog.streams) == 0: - raise KeyError( - f"Stream {stream} is not available for connector {self.name}, " - f"choose from {self.get_available_streams()}", - ) + raise exc.AirbyteLibInputError( + message="Requested stream does not exist.", + context={ + "stream": stream, + "available_streams": self.get_available_streams(), + "connector_name": self.name, + }, + ) from KeyError(stream) iterator: Iterator[dict[str, Any]] = protocol_util.airbyte_messages_to_record_dicts( self._read_with_catalog(streaming_cache_info, configured_catalog), @@ -247,12 +253,12 @@ def check(self) -> None: if msg.connectionStatus.status != Status.FAILED: return # Success! - raise Exception( - f"Connector returned failed status: {msg.connectionStatus.message}", + raise exc.AirbyteConnectorCheckFailedError( + context={ + "message": msg.connectionStatus.message, + } ) - raise Exception( - f"Connector did not return check status. Last logs: {self._last_log_messages}", - ) + raise exc.AirbyteConnectorCheckFailedError(log_text=self._last_log_messages) def install(self) -> None: """Install the connector if it is not yet installed.""" @@ -345,7 +351,9 @@ def _execute(self, args: list[str]) -> Iterator[AirbyteMessage]: except Exception: self._add_to_logs(line) except Exception as e: - raise Exception(f"Execution failed. Last logs: {self._last_log_messages}") from e + raise exc.AirbyteConnectorReadError( + log_text=self._last_log_messages, + ) from e def _tally_records( self, diff --git a/airbyte-lib/airbyte_lib/validate.py b/airbyte-lib/airbyte_lib/validate.py index 8eac20e1692b4..c9ce3944a35ab 100644 --- a/airbyte-lib/airbyte_lib/validate.py +++ b/airbyte-lib/airbyte_lib/validate.py @@ -3,6 +3,7 @@ This tool checks if connectors are compatible with airbyte-lib. """ +from __future__ import annotations import argparse import json @@ -15,6 +16,7 @@ import yaml import airbyte_lib as ab +from airbyte_lib import exceptions as exc def _parse_args() -> argparse.Namespace: @@ -37,7 +39,10 @@ def _parse_args() -> argparse.Namespace: def _run_subprocess_and_raise_on_failure(args: list[str]) -> None: result = subprocess.run(args, check=False) if result.returncode != 0: - raise Exception(f"{args} exited with code {result.returncode}") + raise exc.AirbyteSubprocessFailedError( + run_args=args, + exit_code=result.returncode, + ) def tests(connector_name: str, sample_config: str) -> None: @@ -61,10 +66,14 @@ def tests(connector_name: str, sample_config: str) -> None: record = next(source.get_records(stream)) assert record, "No record returned" break - except Exception as e: + except exc.AirbyteError as e: print(f"Could not read from stream {stream}: {e}") + except Exception as e: + print(f"Unhandled error occurred when trying to read from {stream}: {e}") else: - raise Exception(f"Could not read from any stream from {streams}") + raise exc.AirbyteNoDataFromConnectorError( + context={"selected_streams": streams}, + ) def run() -> None: diff --git a/airbyte-lib/airbyte_lib/version.py b/airbyte-lib/airbyte_lib/version.py index 9ed83a5ef4569..114a730a5e7c1 100644 --- a/airbyte-lib/airbyte_lib/version.py +++ b/airbyte-lib/airbyte_lib/version.py @@ -1,4 +1,5 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. +from __future__ import annotations import importlib.metadata diff --git a/airbyte-lib/docs.py b/airbyte-lib/docs.py index bfd30c05e554f..a609aace7ac32 100644 --- a/airbyte-lib/docs.py +++ b/airbyte-lib/docs.py @@ -1,4 +1,5 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. +from __future__ import annotations import os import pathlib diff --git a/airbyte-lib/examples/run_snowflake_faker.py b/airbyte-lib/examples/run_snowflake_faker.py index 5a6084ce16dcf..2102ab658b179 100644 --- a/airbyte-lib/examples/run_snowflake_faker.py +++ b/airbyte-lib/examples/run_snowflake_faker.py @@ -1,5 +1,5 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. - +from __future__ import annotations import json import os diff --git a/airbyte-lib/examples/run_spacex.py b/airbyte-lib/examples/run_spacex.py index 46c8dee411d87..e195215773819 100644 --- a/airbyte-lib/examples/run_spacex.py +++ b/airbyte-lib/examples/run_spacex.py @@ -1,4 +1,5 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. +from __future__ import annotations from itertools import islice diff --git a/airbyte-lib/examples/run_test_source.py b/airbyte-lib/examples/run_test_source.py index de57ca8420ff6..a984bd6d652f7 100644 --- a/airbyte-lib/examples/run_test_source.py +++ b/airbyte-lib/examples/run_test_source.py @@ -1,4 +1,5 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. +from __future__ import annotations import os diff --git a/airbyte-lib/examples/run_test_source_single_stream.py b/airbyte-lib/examples/run_test_source_single_stream.py index b1cc55cd1f5c7..88faec90cb5fe 100644 --- a/airbyte-lib/examples/run_test_source_single_stream.py +++ b/airbyte-lib/examples/run_test_source_single_stream.py @@ -1,4 +1,5 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. +from __future__ import annotations import os diff --git a/airbyte-lib/poetry.lock b/airbyte-lib/poetry.lock index bbd7435528b26..5dafb19e798a1 100644 --- a/airbyte-lib/poetry.lock +++ b/airbyte-lib/poetry.lock @@ -567,13 +567,13 @@ grpcio-gcp = ["grpcio-gcp (>=0.2.2,<1.0.dev0)"] [[package]] name = "google-auth" -version = "2.26.2" +version = "2.27.0" description = "Google Authentication Library" optional = false python-versions = ">=3.7" files = [ - {file = "google-auth-2.26.2.tar.gz", hash = "sha256:97327dbbf58cccb58fc5a1712bba403ae76668e64814eb30f7316f7e27126b81"}, - {file = "google_auth-2.26.2-py2.py3-none-any.whl", hash = "sha256:3f445c8ce9b61ed6459aad86d8ccdba4a9afed841b2d1451a11ef4db08957424"}, + {file = "google-auth-2.27.0.tar.gz", hash = "sha256:e863a56ccc2d8efa83df7a80272601e43487fa9a728a376205c86c26aaefa821"}, + {file = "google_auth-2.27.0-py2.py3-none-any.whl", hash = "sha256:8e4bad367015430ff253fe49d500fdc3396c1a434db5740828c728e45bcce245"}, ] [package.dependencies] @@ -1110,13 +1110,13 @@ files = [ [[package]] name = "overrides" -version = "7.4.0" +version = "7.6.0" description = "A decorator to automatically detect mismatch when overriding a method." optional = false python-versions = ">=3.6" files = [ - {file = "overrides-7.4.0-py3-none-any.whl", hash = "sha256:3ad24583f86d6d7a49049695efe9933e67ba62f0c7625d53c59fa832ce4b8b7d"}, - {file = "overrides-7.4.0.tar.gz", hash = "sha256:9502a3cca51f4fac40b5feca985b6703a5c1f6ad815588a7ca9e285b9dca6757"}, + {file = "overrides-7.6.0-py3-none-any.whl", hash = "sha256:c36e6635519ea9c5b043b65c36d4b886aee8bd45b7d4681d2a6df0898df4b654"}, + {file = "overrides-7.6.0.tar.gz", hash = "sha256:01e15bbbf15b766f0675c275baa1878bd1c7dc9bc7b9ee13e677cdba93dc1bd9"}, ] [[package]] @@ -1283,13 +1283,13 @@ test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=7.4)", "pytest-co [[package]] name = "pluggy" -version = "1.3.0" +version = "1.4.0" description = "plugin and hook calling mechanisms for python" optional = false python-versions = ">=3.8" files = [ - {file = "pluggy-1.3.0-py3-none-any.whl", hash = "sha256:d89c696a773f8bd377d18e5ecda92b7a3793cbe66c87060a6fb58c7b6e1061f7"}, - {file = "pluggy-1.3.0.tar.gz", hash = "sha256:cf61ae8f126ac6f7c451172cf30e3e43d3ca77615509771b3a984a0730651e12"}, + {file = "pluggy-1.4.0-py3-none-any.whl", hash = "sha256:7db9f7b503d67d1c5b95f59773ebb58a8c1c288129a88665838012cfb07b8981"}, + {file = "pluggy-1.4.0.tar.gz", hash = "sha256:8c85c2876142a764e5b7548e7d9a0e0ddb46f5185161049a79b7e974454223be"}, ] [package.extras] @@ -2494,4 +2494,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "eb7f87eb174b6eabbd28d336a4e4059cb77f886c9e8c40fec0db577803036585" +content-hash = "5eba75179b62f56be141db82121ca9e1c623944306172d7bdacaf5388e6f3384" diff --git a/airbyte-lib/pyproject.toml b/airbyte-lib/pyproject.toml index 35c99ab59965e..c7ffe8f0edf98 100644 --- a/airbyte-lib/pyproject.toml +++ b/airbyte-lib/pyproject.toml @@ -14,7 +14,7 @@ airbyte-cdk = "^0.58.3" jsonschema = "3.2.0" orjson = "^3.9.10" overrides = "^7.4.0" -pandas = "^2.1.4" +pandas = "2.1.4" # 2.2.0 breaks sqlalchemy interop - TODO: optionally retest higher versions psycopg2-binary = "^2.9.9" python-ulid = "^2.2.0" types-pyyaml = "^6.0.12.12" @@ -109,9 +109,11 @@ select = [ "TD", # flake8-todos "TID", # flake8-tidy-imports "TRY", # tryceratops + "TRY002", # Disallow raising vanilla Exception. Create or use a custom exception instead. + "TRY003", # Disallow vanilla string passing. Prefer kwargs to the exception constructur. "UP", # pyupgrade "W", # pycodestyle (warnings) - "YTT" # flake8-2020 + "YTT", # flake8-2020 ] ignore = [ # For rules reference, see https://docs.astral.sh/ruff/rules/ @@ -130,7 +132,6 @@ ignore = [ "S", # flake8-bandit (noisy, security related) "TD002", # Require author for TODOs "TRIO", # flake8-trio (opinionated, noisy) - "TRY003", # Exceptions with too-long string descriptions # TODO: re-evaluate once we have our own exception classes "INP001", # Dir 'examples' is part of an implicit namespace package. Add an __init__.py. # TODO: Consider re-enabling these before release: @@ -140,7 +141,6 @@ ignore = [ "FIX002", # Allow "TODO:" until release (then switch to requiring links via TDO003) "PLW0603", # Using the global statement to update _cache is discouraged "TD003", # Require links for TODOs # TODO: Re-enable when we disable FIX002 - "TRY002", # TODO: When we have time to tackle exception management ("Create your own exception") ] fixable = ["ALL"] unfixable = [ @@ -156,6 +156,7 @@ force-sort-within-sections = false lines-after-imports = 2 known-first-party = ["airbyte_cdk", "airbyte_protocol"] known-local-folder = ["airbyte_lib"] +required-imports = ["from __future__ import annotations"] known-third-party = [] section-order = [ "future", diff --git a/airbyte-lib/tests/integration_tests/test_integration.py b/airbyte-lib/tests/integration_tests/test_integration.py index 503539ae8380b..1ea3e13cd7cec 100644 --- a/airbyte-lib/tests/integration_tests/test_integration.py +++ b/airbyte-lib/tests/integration_tests/test_integration.py @@ -23,6 +23,7 @@ import airbyte_lib as ab from airbyte_lib.results import ReadResult +from airbyte_lib import exceptions as exc @pytest.fixture(scope="module", autouse=True) @@ -318,7 +319,7 @@ def test_lazy_dataset_from_source( assert list_from_iter_a == list_from_iter_b # Make sure that we get a key error if we try to access a stream that doesn't exist - with pytest.raises(KeyError): + with pytest.raises(exc.AirbyteLibInputError): source.get_records(not_a_stream_name) # Make sure we can iterate on all available streams @@ -343,7 +344,7 @@ def test_lazy_dataset_from_source( def test_check_fail_on_missing_config(method_call): source = ab.get_connector("source-test") - with pytest.raises(Exception, match="Config is not set, either set in get_connector or via source.set_config"): + with pytest.raises(exc.AirbyteConnectorConfigurationMissingError): method_call(source) def test_sync_with_merge_to_postgres(new_pg_cache_config: PostgresCacheConfig, expected_test_stream_data: dict[str, list[dict[str, str | int]]]): diff --git a/airbyte-lib/tests/unit_tests/test_exceptions.py b/airbyte-lib/tests/unit_tests/test_exceptions.py new file mode 100644 index 0000000000000..ef5a391e47df0 --- /dev/null +++ b/airbyte-lib/tests/unit_tests/test_exceptions.py @@ -0,0 +1,28 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +import inspect +import pytest +import inspect +import airbyte_lib.exceptions as exceptions_module + +def test_exceptions(): + exception_classes = [ + (name, obj) + for name, obj in inspect.getmembers(exceptions_module) + if inspect.isclass(obj) and name.endswith("Error") + ] + assert "AirbyteError" in [name for name, _ in exception_classes] + assert "NotAnError" not in [name for name, _ in exception_classes] + for name, obj in exception_classes: + instance = obj() + message = instance.get_message() + assert isinstance(message, str), "No message for class: " + name + assert message.count("\n") == 0 + assert message != "" + assert message.strip() == message + assert name.startswith("Airbyte") + assert name.endswith("Error") + + +if __name__ == "__main__": + pytest.main()