Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions airbyte_cdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@
from .sources.declarative.extractors.record_filter import RecordFilter
from .sources.declarative.incremental import DatetimeBasedCursor
from .sources.declarative.interpolation import InterpolatedBoolean, InterpolatedString
from .sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from .sources.declarative.migrations.legacy_to_per_partition_state_migration import (
LegacyToPerPartitionStateMigration,
)
Expand Down Expand Up @@ -253,7 +252,6 @@
"JsonDecoder",
"JsonFileSchemaLoader",
"LegacyToPerPartitionStateMigration",
"ManifestDeclarativeSource",
"MinMaxDatetime",
"NoAuth",
"OffsetIncrement",
Expand Down
12 changes: 7 additions & 5 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
ConcurrentDeclarativeSource,
TestLimits,
)
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
Expand Down Expand Up @@ -90,7 +88,7 @@ def create_source(


def read_stream(
source: DeclarativeSource,
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
Expand Down Expand Up @@ -128,7 +126,9 @@ def read_stream(
return error.as_airbyte_message()


def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
def resolve_manifest(
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
) -> AirbyteMessage:
try:
return AirbyteMessage(
type=Type.RECORD,
Expand All @@ -145,7 +145,9 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
return error.as_airbyte_message()


def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage:
def full_resolve_manifest(
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], limits: TestLimits
) -> AirbyteMessage:
try:
manifest = {**source.resolved_manifest}
streams = manifest.get("streams", [])
Expand Down
6 changes: 4 additions & 2 deletions airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
ConfiguredAirbyteCatalog,
ConfiguredAirbyteCatalogSerializer,
)
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
ConcurrentDeclarativeSource,
)
from airbyte_cdk.sources.source import Source
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

Expand Down Expand Up @@ -68,7 +70,7 @@ def get_config_and_catalog_from_args(


def handle_connector_builder_request(
source: ManifestDeclarativeSource,
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
command: str,
config: Mapping[str, Any],
catalog: Optional[ConfiguredAirbyteCatalog],
Expand Down
18 changes: 10 additions & 8 deletions airbyte_cdk/connector_builder/test_reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
ConfiguredAirbyteCatalog,
TraceType,
)
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
ConcurrentDeclarativeSource,
)
from airbyte_cdk.utils import AirbyteTracedException
from airbyte_cdk.utils.datetime_format_inferrer import DatetimeFormatInferrer
from airbyte_cdk.utils.schema_inferrer import (
Expand Down Expand Up @@ -55,7 +57,7 @@ class TestReader:
that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats.

Parameters:
source (DeclarativeSource): The data source to read from.
source (ConcurrentDeclarativeSource): The data source to read from.
config (Mapping[str, Any]): Configuration parameters for the source.
configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration.
state (List[AirbyteStateMessage]): Current state information for the read.
Expand Down Expand Up @@ -83,7 +85,7 @@ def __init__(

def run_test_read(
self,
source: DeclarativeSource,
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
stream_name: str,
Expand All @@ -94,7 +96,7 @@ def run_test_read(
Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.

Parameters:
source (DeclarativeSource): The source instance providing the streams.
source (ConcurrentDeclarativeSource): The source instance providing the streams.
config (Mapping[str, Any]): The configuration settings to use for reading.
configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
state (List[AirbyteStateMessage]): A list of state messages to resume the read.
Expand Down Expand Up @@ -126,7 +128,7 @@ def run_test_read(
if stream
else None,
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
if stream
if stream and stream.cursor_field
else None,
)
datetime_format_inferrer = DatetimeFormatInferrer()
Expand Down Expand Up @@ -381,13 +383,13 @@ def _get_latest_config_update(

def _read_stream(
self,
source: DeclarativeSource,
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
) -> Iterator[AirbyteMessage]:
"""
Reads messages from the given DeclarativeSource using an AirbyteEntrypoint.
Reads messages from the given ConcurrentDeclarativeSource using an AirbyteEntrypoint.

This method attempts to yield messages from the source's read generator. If the generator
raises an AirbyteTracedException, it checks whether the exception message indicates a non-actionable
Expand All @@ -396,7 +398,7 @@ def _read_stream(
wrapped into an AirbyteTracedException, and yielded as an AirbyteMessage.

Parameters:
source (DeclarativeSource): The source object that provides data reading logic.
source (ConcurrentDeclarativeSource): The source object that provides data reading logic.
config (Mapping[str, Any]): The configuration dictionary for the source.
configured_catalog (ConfiguredAirbyteCatalog): The catalog defining the streams and their configurations.
state (List[AirbyteStateMessage]): A list representing the current state for incremental sync.
Expand Down
1 change: 1 addition & 0 deletions airbyte_cdk/legacy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
1 change: 1 addition & 0 deletions airbyte_cdk/legacy/sources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
1 change: 1 addition & 0 deletions airbyte_cdk/legacy/sources/declarative/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from airbyte_cdk.connector_builder.models import (
LogMessage as ConnectorBuilderLogMessage,
)
from airbyte_cdk.legacy.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.manifest_migrations.migration_handler import (
ManifestMigrationHandler,
)
Expand All @@ -34,7 +35,6 @@
from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConditionalStreams as ConditionalStreamsModel,
Expand Down
Loading
Loading