Skip to content

Commit

Permalink
CDK: updated error message for missing streams (#36833)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristoGrab committed Apr 4, 2024
1 parent 9cd72c3 commit e74d936
Show file tree
Hide file tree
Showing 5 changed files with 439 additions and 337 deletions.
13 changes: 10 additions & 3 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,16 @@ def read(
if not stream_instance:
if not self.raise_exception_on_missing_stream:
continue
raise KeyError(
f"The stream {configured_stream.stream.name} no longer exists in the configuration. "
f"Refresh the schema in replication settings and remove this stream from future sync attempts."

error_message = (
f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. "
f"Refresh the schema in your replication settings and remove this stream from future sync attempts."
)

raise AirbyteTracedException(
message="A stream listed in your configuration was not found in the source. Please check the logs for more details.",
internal_message=error_message,
failure_type=FailureType.config_error,
)

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
from abc import ABC
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog
from airbyte_cdk.models import AirbyteMessage, AirbyteStateMessage, ConfiguredAirbyteCatalog, FailureType
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


class ConcurrentSourceAdapter(AbstractSource, ABC):
Expand Down Expand Up @@ -54,10 +55,18 @@ def _select_abstract_streams(self, config: Mapping[str, Any], configured_catalog
if not stream_instance:
if not self.raise_exception_on_missing_stream:
continue
raise KeyError(
f"The stream {configured_stream.stream.name} no longer exists in the configuration. "
f"Refresh the schema in replication settings and remove this stream from future sync attempts."

error_message = (
f"The stream '{configured_stream.stream.name}' in your connection configuration was not found in the source. "
f"Refresh the schema in your replication settings and remove this stream from future sync attempts."
)

raise AirbyteTracedException(
message="A stream listed in your configuration was not found in the source. Please check the logs for more details.",
internal_message=error_message,
failure_type=FailureType.config_error,
)

if isinstance(stream_instance, AbstractStreamFacade):
abstract_streams.append(stream_instance.get_underlying_stream())
return abstract_streams
Loading

0 comments on commit e74d936

Please sign in to comment.