Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
AlwaysAvailableAvailabilityStrategy,
)
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
Expand Down Expand Up @@ -368,7 +365,6 @@ def _group_streams(
partition_generator=partition_generator,
name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=cursor.cursor_field.cursor_field_key
if hasattr(cursor, "cursor_field")
Expand Down Expand Up @@ -408,7 +404,6 @@ def _group_streams(
partition_generator=partition_generator,
name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=None,
logger=self.logger,
Expand Down Expand Up @@ -464,7 +459,6 @@ def _group_streams(
partition_generator=partition_generator,
name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
logger=self.logger,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
from .abstract_file_based_availability_strategy import (
AbstractFileBasedAvailabilityStrategy,
AbstractFileBasedAvailabilityStrategyWrapper,
)
from .abstract_file_based_availability_strategy import AbstractFileBasedAvailabilityStrategy
from .default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy

__all__ = [
"AbstractFileBasedAvailabilityStrategy",
"AbstractFileBasedAvailabilityStrategyWrapper",
"DefaultFileBasedAvailabilityStrategy",
]
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@

from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
AbstractAvailabilityStrategy,
StreamAvailability,
StreamAvailable,
StreamUnavailable,
)
from airbyte_cdk.sources.streams.core import Stream

if TYPE_CHECKING:
Expand All @@ -28,7 +22,7 @@ def check_availability( # type: ignore[override] # Signature doesn't match bas
self,
stream: Stream,
logger: logging.Logger,
_: Optional[Source],
source: Optional[Source] = None,
) -> Tuple[bool, Optional[str]]:
"""
Perform a connection check for the stream.
Expand All @@ -51,23 +45,3 @@ def check_availability_and_parsability(
Returns (True, None) if successful, otherwise (False, <error message>).
"""
...


class AbstractFileBasedAvailabilityStrategyWrapper(AbstractAvailabilityStrategy):
def __init__(self, stream: AbstractFileBasedStream) -> None:
self.stream = stream

def check_availability(self, logger: logging.Logger) -> StreamAvailability:
is_available, reason = self.stream.availability_strategy.check_availability(
self.stream, logger, None
)
if is_available:
return StreamAvailable()
return StreamUnavailable(reason or "")

def check_availability_and_parsability(
self, logger: logging.Logger
) -> Tuple[bool, Optional[str]]:
return self.stream.availability_strategy.check_availability_and_parsability(
self.stream, logger, None
)
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
)

@cached_property
@deprecated("Deprecated as of CDK version 3.7.0.")
def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
return self._availability_strategy

Expand Down
2 changes: 0 additions & 2 deletions airbyte_cdk/sources/file_based/stream/concurrent/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.file_based.availability_strategy import (
AbstractFileBasedAvailabilityStrategy,
AbstractFileBasedAvailabilityStrategyWrapper,
)
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
Expand Down Expand Up @@ -97,7 +96,6 @@ def create_from_stream(
),
name=stream.name,
json_schema=stream.get_json_schema(),
availability_strategy=AbstractFileBasedAvailabilityStrategyWrapper(stream),
primary_key=pk,
cursor_field=cursor_field,
logger=logger,
Expand Down
1 change: 1 addition & 0 deletions airbyte_cdk/sources/streams/availability_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from airbyte_cdk.sources import Source


# FIXME this
class AvailabilityStrategy(ABC):
"""
Abstract base class for checking stream availability.
Expand Down
7 changes: 0 additions & 7 deletions airbyte_cdk/sources/streams/concurrent/abstract_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from airbyte_cdk.models import AirbyteStream
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition

Expand Down Expand Up @@ -64,12 +63,6 @@ def cursor_field(self) -> Optional[str]:
:return: The name of the field used as a cursor. Nested cursor fields are not supported.
"""

@abstractmethod
def check_availability(self) -> StreamAvailability:
"""
:return: The stream's availability
"""

@abstractmethod
def get_json_schema(self) -> Mapping[str, Any]:
"""
Expand Down
43 changes: 0 additions & 43 deletions airbyte_cdk/sources/streams/concurrent/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,7 @@
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
AbstractAvailabilityStrategy,
AlwaysAvailableAvailabilityStrategy,
)
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
Expand Down Expand Up @@ -101,7 +96,6 @@ def create_from_stream(
name=stream.name,
namespace=stream.namespace,
json_schema=stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=pk,
cursor_field=cursor_field,
logger=logger,
Expand Down Expand Up @@ -210,18 +204,6 @@ def get_json_schema(self) -> Mapping[str, Any]:
def supports_incremental(self) -> bool:
return self._legacy_stream.supports_incremental

def check_availability(
self, logger: logging.Logger, source: Optional["Source"] = None
) -> Tuple[bool, Optional[str]]:
"""
Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters
:param logger: (ignored)
:param source: (ignored)
:return:
"""
availability = self._abstract_stream.check_availability()
return availability.is_available(), availability.message()

def as_airbyte_stream(self) -> AirbyteStream:
return self._abstract_stream.as_airbyte_stream()

Expand Down Expand Up @@ -370,28 +352,3 @@ def generate(self) -> Iterable[Partition]:
self._cursor_field,
self._state,
)


@deprecated(
"Availability strategy has been soft deprecated. Do not use. Class is subject to removal",
category=ExperimentalClassWarning,
)
class AvailabilityStrategyFacade(AvailabilityStrategy):
def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy):
self._abstract_availability_strategy = abstract_availability_strategy

def check_availability(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
) -> Tuple[bool, Optional[str]]:
"""
Checks stream availability.

Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy.

:param stream: (unused)
:param logger: logger object to use
:param source: (unused)
:return: A tuple of (boolean, str). If boolean is true, then the stream
"""
stream_availability = self._abstract_availability_strategy.check_availability(logger)
return stream_availability.is_available(), stream_availability.message()
94 changes: 0 additions & 94 deletions airbyte_cdk/sources/streams/concurrent/availability_strategy.py

This file was deleted.

9 changes: 0 additions & 9 deletions airbyte_cdk/sources/streams/concurrent/default_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@

from airbyte_cdk.models import AirbyteStream, SyncMode
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
AbstractAvailabilityStrategy,
StreamAvailability,
)
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
Expand All @@ -23,7 +19,6 @@ def __init__(
partition_generator: PartitionGenerator,
name: str,
json_schema: Mapping[str, Any],
availability_strategy: AbstractAvailabilityStrategy,
primary_key: List[str],
cursor_field: Optional[str],
logger: Logger,
Expand All @@ -34,7 +29,6 @@ def __init__(
self._stream_partition_generator = partition_generator
self._name = name
self._json_schema = json_schema
self._availability_strategy = availability_strategy
self._primary_key = primary_key
self._cursor_field = cursor_field
self._logger = logger
Expand All @@ -53,9 +47,6 @@ def name(self) -> str:
def namespace(self) -> Optional[str]:
return self._namespace

def check_availability(self) -> StreamAvailability:
return self._availability_strategy.check_availability(self._logger)

@property
def cursor_field(self) -> Optional[str]:
return self._cursor_field
Expand Down
Loading
Loading