Skip to content

Commit

Permalink
CDK: add support for streams with state attribute (#9746)
Browse files Browse the repository at this point in the history
* add support for streams with state attribute
* fix pre-commit and format
* update state attribute docs and logic
* added IncrementalMixin

Co-authored-by: Eugene Kulak <kulak.eugene@gmail.com>
  • Loading branch information
keu and eugene-kulak committed Feb 16, 2022
1 parent d6747ab commit d173ce7
Show file tree
Hide file tree
Showing 11 changed files with 677 additions and 298 deletions.
6 changes: 6 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
@@ -1,5 +1,11 @@
# Changelog

## 0.1.49
Add support for streams with explicit state attribute.

## 0.1.48
Fix type annotations.

## 0.1.47
Fix typing errors.

Expand Down
76 changes: 57 additions & 19 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Expand Up @@ -52,7 +52,8 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
@abstractmethod
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here.
:param config: The user-provided configuration as specified by the source's spec.
Any stream construction related operation should happen here.
:return: A list of the streams in this source connector.
"""

Expand All @@ -65,12 +66,16 @@ def name(self) -> str:
return self.__class__.__name__

def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
"""Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
"""Implements the Discover operation from the Airbyte Specification.
See https://docs.airbyte.io/architecture/airbyte-specification.
"""
streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
return AirbyteCatalog(streams=streams)

def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
"""Implements the Check Connection operation from the Airbyte Specification.
See https://docs.airbyte.io/architecture/airbyte-specification.
"""
try:
check_succeeded, error = self.check_connection(logger, config)
if not check_succeeded:
Expand All @@ -81,7 +86,11 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon
return AirbyteConnectionStatus(status=Status.SUCCEEDED)

def read(
self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None
self,
logger: logging.Logger,
config: Mapping[str, Any],
catalog: ConfiguredAirbyteCatalog,
state: MutableMapping[str, Any] = None,
) -> Iterator[AirbyteMessage]:
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
connector_state = copy.deepcopy(state or {})
Expand All @@ -96,10 +105,12 @@ def read(
stream_instance = stream_instances.get(configured_stream.stream.name)
if not stream_instance:
raise KeyError(
f"The requested stream {configured_stream.stream.name} was not found in the source. Available streams: {stream_instances.keys()}"
f"The requested stream {configured_stream.stream.name} was not found in the source."
f" Available streams: {stream_instances.keys()}"
)

try:
timer.start_event(f"Syncing stream {configured_stream.stream.name}")
yield from self._read_stream(
logger=logger,
stream_instance=stream_instance,
Expand All @@ -108,10 +119,11 @@ def read(
internal_config=internal_config,
)
except Exception as e:
logger.exception(f"Encountered an exception while reading stream {self.name}")
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
raise e
finally:
logger.info(f"Finished syncing {self.name}")
timer.finish_event()
logger.info(f"Finished syncing {configured_stream.stream.name}")
logger.info(timer.report())

logger.info(f"Finished syncing {self.name}")
Expand All @@ -131,7 +143,13 @@ def _read_stream(

use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
if use_incremental:
record_iterator = self._read_incremental(logger, stream_instance, configured_stream, connector_state, internal_config)
record_iterator = self._read_incremental(
logger,
stream_instance,
configured_stream,
connector_state,
internal_config,
)
else:
record_iterator = self._read_full_refresh(stream_instance, configured_stream, internal_config)

Expand Down Expand Up @@ -166,19 +184,31 @@ def _read_incremental(
connector_state: MutableMapping[str, Any],
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
"""Read stream using incremental algorithm
:param logger:
:param stream_instance:
:param configured_stream:
:param connector_state:
:param internal_config:
:return:
"""
stream_name = configured_stream.stream.name
stream_state = connector_state.get(stream_name, {})
if stream_state:
if stream_state and "state" in dir(stream_instance):
stream_instance.state = stream_state
logger.info(f"Setting state of {stream_name} stream to {stream_state}")

slices = stream_instance.stream_slices(
cursor_field=configured_stream.cursor_field, sync_mode=SyncMode.incremental, stream_state=stream_state
cursor_field=configured_stream.cursor_field,
sync_mode=SyncMode.incremental,
stream_state=stream_state,
)
total_records_counter = 0
for slice in slices:
for _slice in slices:
records = stream_instance.read_records(
sync_mode=SyncMode.incremental,
stream_slice=slice,
stream_slice=_slice,
stream_state=stream_state,
cursor_field=configured_stream.cursor_field or None,
)
Expand All @@ -187,7 +217,7 @@ def _read_incremental(
stream_state = stream_instance.get_updated_state(stream_state, record_data)
checkpoint_interval = stream_instance.state_checkpoint_interval
if checkpoint_interval and record_counter % checkpoint_interval == 0:
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger)
yield self._checkpoint_state(stream_instance, stream_state, connector_state)

total_records_counter += 1
# This functionality should ideally live outside of this method
Expand All @@ -197,28 +227,36 @@ def _read_incremental(
# Break from slice loop to save state and exit from _read_incremental function.
break

yield self._checkpoint_state(stream_name, stream_state, connector_state, logger)
yield self._checkpoint_state(stream_instance, stream_state, connector_state)
if self._limit_reached(internal_config, total_records_counter):
return

def _read_full_refresh(
self, stream_instance: Stream, configured_stream: ConfiguredAirbyteStream, internal_config: InternalConfig
self,
stream_instance: Stream,
configured_stream: ConfiguredAirbyteStream,
internal_config: InternalConfig,
) -> Iterator[AirbyteMessage]:
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field)
total_records_counter = 0
for slice in slices:
records = stream_instance.read_records(
stream_slice=slice, sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field
stream_slice=slice,
sync_mode=SyncMode.full_refresh,
cursor_field=configured_stream.cursor_field,
)
for record in records:
yield self._as_airbyte_record(configured_stream.stream.name, record)
total_records_counter += 1
if self._limit_reached(internal_config, total_records_counter):
return

def _checkpoint_state(self, stream_name, stream_state, connector_state, logger):
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
connector_state[stream_name] = stream_state
def _checkpoint_state(self, stream, stream_state, connector_state):
try:
connector_state[stream.name] = stream.state
except AttributeError:
connector_state[stream.name] = stream_state

return AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=connector_state))

@lru_cache(maxsize=None)
Expand Down
39 changes: 37 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/core.py
Expand Up @@ -12,6 +12,7 @@
from airbyte_cdk.models import AirbyteStream, SyncMode
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from deprecated.classic import deprecated


def package_name_from_class(cls: object) -> str:
Expand All @@ -20,6 +21,40 @@ def package_name_from_class(cls: object) -> str:
return module.__name__.split(".")[0]


class IncrementalMixin(ABC):
"""Mixin to make stream incremental.
class IncrementalStream(Stream, IncrementalMixin):
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state[self.cursor_field] = value[self.cursor_field]
"""

@property
@abstractmethod
def state(self) -> MutableMapping[str, Any]:
"""State getter, should return state in form that can serialized to a string and send to the output
as a STATE AirbyteMessage.
A good example of a state is a cursor_value:
{
self.cursor_field: "cursor_value"
}
State should try to be as small as possible but at the same time descriptive enough to restore
syncing process from the point where it stopped.
"""

@state.setter
@abstractmethod
def state(self, value: MutableMapping[str, Any]):
"""State setter, accept state serialized by state getter."""


class Stream(ABC):
"""
Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol.
Expand Down Expand Up @@ -136,9 +171,9 @@ def state_checkpoint_interval(self) -> Optional[int]:
"""
return None

@deprecated(version='0.1.49', reason="You should use explicit state property instead, see IncrementalMixin docs.")
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
"""
Override to extract state from the latest record. Needed to implement incremental sync.
"""Override to extract state from the latest record. Needed to implement incremental sync.
Inspects the latest record extracted from the data source and the current state object and return an updated state object.
Expand Down

0 comments on commit d173ce7

Please sign in to comment.