From d173ce741fd92da84f4d78cacb4124a6af0b79de Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Wed, 16 Feb 2022 22:20:33 +0200 Subject: [PATCH] CDK: add support for streams with state attribute (#9746) * add support for streams with state attribute * fix pre-commit and format * update state attribute docs and logic * added IncrementalMixin Co-authored-by: Eugene Kulak --- airbyte-cdk/python/CHANGELOG.md | 6 + .../airbyte_cdk/sources/abstract_source.py | 76 ++- .../airbyte_cdk/sources/streams/core.py | 39 +- .../docs/concepts/incremental-stream.md | 104 +++- .../cdk-tutorial-python-http/6-read-data.md | 65 ++- .../python/docs/tutorials/http_api_source.md | 50 +- airbyte-cdk/python/setup.py | 13 +- .../sources/test_abstract_source.py | 498 ++++++++++++------ .../integration_tests/configured_catalog.json | 32 +- .../cdk-python/incremental-stream.md | 40 +- .../cdk-tutorial-python-http/6-read-data.md | 52 +- 11 files changed, 677 insertions(+), 298 deletions(-) diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 4c4f0cf604aae..eaed852c47fe8 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 0.1.49 +Add support for streams with explicit state attribute. + +## 0.1.48 +Fix type annotations. + ## 0.1.47 Fix typing errors. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index d2e81e99b3505..ecfdcc56f6c40 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -52,7 +52,8 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> @abstractmethod def streams(self, config: Mapping[str, Any]) -> List[Stream]: """ - :param config: The user-provided configuration as specified by the source's spec. Any stream construction related operation should happen here. + :param config: The user-provided configuration as specified by the source's spec. + Any stream construction related operation should happen here. :return: A list of the streams in this source connector. """ @@ -65,12 +66,16 @@ def name(self) -> str: return self.__class__.__name__ def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: - """Implements the Discover operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification.""" + """Implements the Discover operation from the Airbyte Specification. + See https://docs.airbyte.io/architecture/airbyte-specification. + """ streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)] return AirbyteCatalog(streams=streams) def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: - """Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification.""" + """Implements the Check Connection operation from the Airbyte Specification. + See https://docs.airbyte.io/architecture/airbyte-specification. + """ try: check_succeeded, error = self.check_connection(logger, config) if not check_succeeded: @@ -81,7 +86,11 @@ def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCon return AirbyteConnectionStatus(status=Status.SUCCEEDED) def read( - self, logger: logging.Logger, config: Mapping[str, Any], catalog: ConfiguredAirbyteCatalog, state: MutableMapping[str, Any] = None + self, + logger: logging.Logger, + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: MutableMapping[str, Any] = None, ) -> Iterator[AirbyteMessage]: """Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification.""" connector_state = copy.deepcopy(state or {}) @@ -96,10 +105,12 @@ def read( stream_instance = stream_instances.get(configured_stream.stream.name) if not stream_instance: raise KeyError( - f"The requested stream {configured_stream.stream.name} was not found in the source. Available streams: {stream_instances.keys()}" + f"The requested stream {configured_stream.stream.name} was not found in the source." + f" Available streams: {stream_instances.keys()}" ) try: + timer.start_event(f"Syncing stream {configured_stream.stream.name}") yield from self._read_stream( logger=logger, stream_instance=stream_instance, @@ -108,10 +119,11 @@ def read( internal_config=internal_config, ) except Exception as e: - logger.exception(f"Encountered an exception while reading stream {self.name}") + logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}") raise e finally: - logger.info(f"Finished syncing {self.name}") + timer.finish_event() + logger.info(f"Finished syncing {configured_stream.stream.name}") logger.info(timer.report()) logger.info(f"Finished syncing {self.name}") @@ -131,7 +143,13 @@ def _read_stream( use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental if use_incremental: - record_iterator = self._read_incremental(logger, stream_instance, configured_stream, connector_state, internal_config) + record_iterator = self._read_incremental( + logger, + stream_instance, + configured_stream, + connector_state, + internal_config, + ) else: record_iterator = self._read_full_refresh(stream_instance, configured_stream, internal_config) @@ -166,19 +184,31 @@ def _read_incremental( connector_state: MutableMapping[str, Any], internal_config: InternalConfig, ) -> Iterator[AirbyteMessage]: + """Read stream using incremental algorithm + + :param logger: + :param stream_instance: + :param configured_stream: + :param connector_state: + :param internal_config: + :return: + """ stream_name = configured_stream.stream.name stream_state = connector_state.get(stream_name, {}) - if stream_state: + if stream_state and "state" in dir(stream_instance): + stream_instance.state = stream_state logger.info(f"Setting state of {stream_name} stream to {stream_state}") slices = stream_instance.stream_slices( - cursor_field=configured_stream.cursor_field, sync_mode=SyncMode.incremental, stream_state=stream_state + cursor_field=configured_stream.cursor_field, + sync_mode=SyncMode.incremental, + stream_state=stream_state, ) total_records_counter = 0 - for slice in slices: + for _slice in slices: records = stream_instance.read_records( sync_mode=SyncMode.incremental, - stream_slice=slice, + stream_slice=_slice, stream_state=stream_state, cursor_field=configured_stream.cursor_field or None, ) @@ -187,7 +217,7 @@ def _read_incremental( stream_state = stream_instance.get_updated_state(stream_state, record_data) checkpoint_interval = stream_instance.state_checkpoint_interval if checkpoint_interval and record_counter % checkpoint_interval == 0: - yield self._checkpoint_state(stream_name, stream_state, connector_state, logger) + yield self._checkpoint_state(stream_instance, stream_state, connector_state) total_records_counter += 1 # This functionality should ideally live outside of this method @@ -197,18 +227,23 @@ def _read_incremental( # Break from slice loop to save state and exit from _read_incremental function. break - yield self._checkpoint_state(stream_name, stream_state, connector_state, logger) + yield self._checkpoint_state(stream_instance, stream_state, connector_state) if self._limit_reached(internal_config, total_records_counter): return def _read_full_refresh( - self, stream_instance: Stream, configured_stream: ConfiguredAirbyteStream, internal_config: InternalConfig + self, + stream_instance: Stream, + configured_stream: ConfiguredAirbyteStream, + internal_config: InternalConfig, ) -> Iterator[AirbyteMessage]: slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field) total_records_counter = 0 for slice in slices: records = stream_instance.read_records( - stream_slice=slice, sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field + stream_slice=slice, + sync_mode=SyncMode.full_refresh, + cursor_field=configured_stream.cursor_field, ) for record in records: yield self._as_airbyte_record(configured_stream.stream.name, record) @@ -216,9 +251,12 @@ def _read_full_refresh( if self._limit_reached(internal_config, total_records_counter): return - def _checkpoint_state(self, stream_name, stream_state, connector_state, logger): - logger.info(f"Setting state of {stream_name} stream to {stream_state}") - connector_state[stream_name] = stream_state + def _checkpoint_state(self, stream, stream_state, connector_state): + try: + connector_state[stream.name] = stream.state + except AttributeError: + connector_state[stream.name] = stream_state + return AirbyteMessage(type=MessageType.STATE, state=AirbyteStateMessage(data=connector_state)) @lru_cache(maxsize=None) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py index 9aea6d7d15084..fd6b323241283 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/core.py @@ -12,6 +12,7 @@ from airbyte_cdk.models import AirbyteStream, SyncMode from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer +from deprecated.classic import deprecated def package_name_from_class(cls: object) -> str: @@ -20,6 +21,40 @@ def package_name_from_class(cls: object) -> str: return module.__name__.split(".")[0] +class IncrementalMixin(ABC): + """Mixin to make stream incremental. + + class IncrementalStream(Stream, IncrementalMixin): + @property + def state(self): + return self._state + + @state.setter + def state(self, value): + self._state[self.cursor_field] = value[self.cursor_field] + """ + + @property + @abstractmethod + def state(self) -> MutableMapping[str, Any]: + """State getter, should return state in form that can serialized to a string and send to the output + as a STATE AirbyteMessage. + + A good example of a state is a cursor_value: + { + self.cursor_field: "cursor_value" + } + + State should try to be as small as possible but at the same time descriptive enough to restore + syncing process from the point where it stopped. + """ + + @state.setter + @abstractmethod + def state(self, value: MutableMapping[str, Any]): + """State setter, accept state serialized by state getter.""" + + class Stream(ABC): """ Base abstract class for an Airbyte Stream. Makes no assumption of the Stream's underlying transport protocol. @@ -136,9 +171,9 @@ def state_checkpoint_interval(self) -> Optional[int]: """ return None + @deprecated(version='0.1.49', reason="You should use explicit state property instead, see IncrementalMixin docs.") def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]): - """ - Override to extract state from the latest record. Needed to implement incremental sync. + """Override to extract state from the latest record. Needed to implement incremental sync. Inspects the latest record extracted from the data source and the current state object and return an updated state object. diff --git a/airbyte-cdk/python/docs/concepts/incremental-stream.md b/airbyte-cdk/python/docs/concepts/incremental-stream.md index 37009e3f8fd8a..b9db5260e0c9c 100644 --- a/airbyte-cdk/python/docs/concepts/incremental-stream.md +++ b/airbyte-cdk/python/docs/concepts/incremental-stream.md @@ -1,47 +1,103 @@ -# The Incremental Stream +# Incremental Streams -An incremental Stream is a stream which reads data incrementally. That is, it only reads data that was generated or updated since the last time it ran, and is thus far more efficient than a stream which reads all the source data every time it runs. If possible, developers are encouraged to implement incremental streams to reduce sync times and resource usage. - -Several new pieces are essential to understand how incrementality works with the CDK: +An incremental Stream is a stream which reads data incrementally. That is, it only reads data that was generated or updated since the last time it ran, and is thus far more efficient than a stream which reads all the source data every time it runs. If possible, developers are encouraged to implement incremental streams to reduce sync times and resource usage. + +Several new pieces are essential to understand how incrementality works with the CDK: * `AirbyteStateMessage` * cursor fields -* `Stream.get_updated_state` -as well as a few other optional concepts. +* `IncrementalMixin` +* `Stream.get_updated_state` (deprecated) + + as well as a few other optional concepts. ### `AirbyteStateMessage` -The `AirbyteStateMessage` -persists state between syncs, and allows a new sync to pick up from where the previous sync last finished. See the [incremental sync guide](https://docs.airbyte.io/understanding-airbyte/connections/incremental-append) for more information. +The `AirbyteStateMessage` persists state between syncs, and allows a new sync to pick up from where the previous sync last finished. See the [incremental sync guide](https://docs.airbyte.io/understanding-airbyte/connections/incremental-append) for more information. ### Cursor fields + The `cursor_field` refers to the field in the stream's output records used to determine the "recency" or ordering of records. An example is a `created_at` or `updated_at` field in an API or DB table. -Cursor fields can be input by the user (e.g: a user can choose to use an auto-incrementing `id` column in a DB table) or they can be defined by the source e.g: where an API defines that `updated_at` is what determines the ordering of records. +Cursor fields can be input by the user \(e.g: a user can choose to use an auto-incrementing `id` column in a DB table\) or they can be defined by the source e.g: where an API defines that `updated_at` is what determines the ordering of records. + +In the context of the CDK, setting the `Stream.cursor_field` property to any truthy value informs the framework that this stream is incremental. + +### `IncrementalMixin` + +This class mixin adds property `state` with abstract setter and getter. +The `state` attribute helps the CDK figure out the current state of sync at any moment (in contrast to deprecated `Stream.get_updated_state` method). +The setter typically deserialize state saved by CDK and initialize internal state of the stream. +The getter should serialize internal state of the stream. -In the context of the CDK, setting the `Stream.cursor_field` property to any value informs the framework that this stream is incremental. +```python +@property +def state(self) -> Mapping[str, Any]: + return {self.cursor_field: str(self._cursor_value)} + +@state.setter +def state(self, value: Mapping[str, Any]): + self._cursor_value = value[self.cursor_field] +``` + +The actual logic of updating state during reading is implemented somewhere else, usually as part of `read_records` method, right after the latest record returned that matches the new state. +Therefore, the state represents the latest checkpoint successfully achieved, and all next records should match the next state after that one. +```python +def read_records(self, ...): + ... + yield record + yield record + yield record + self._cursor_value = max(record[self.cursor_field], self._cursor_value) + yield record + yield record + yield record + self._cursor_value = max(record[self.cursor_field], self._cursor_value) +``` ### `Stream.get_updated_state` -This function helps the CDK figure out the latest state for every record output by the connector -(as returned by the `Stream.read_records` method). This allows sync to resume from where the previous sync last stopped, -regardless of success or failure. This function typically compares the state object's and the latest record's cursor field, picking the latest one. +(deprecated since 1.48.0, see `IncrementalMixin`) + +This function helps the stream keep track of the latest state by inspecting every record output by the stream \(as returned by the `Stream.read_records` method\) and comparing it against the most recent state object. This allows sync to resume from where the previous sync last stopped, regardless of success or failure. This function typically compares the state object's and the latest record's cursor field, picking the latest one. + +## Checkpointing state + +There are two ways to checkpointing state \(i.e: controlling the timing of when state is saved\) while reading data from a connector: + +1. Interval-based checkpointing +2. Stream Slices + +### Interval based checkpointing + +This is the simplest method for checkpointing. When the interval is set to a truthy value e.g: 100, then state is persisted after every 100 records output by the connector e.g: state is saved after reading 100 records, then 200, 300, etc.. + +While this is very simple, **it requires that records are output in ascending order with regards to the cursor field**. For example, if your stream outputs records in ascending order of the `updated_at` field, then this is a good fit for your usecase. But if the stream outputs records in a random order, then you cannot use this method because we can only be certain that we read records after a particular `updated_at` timestamp once all records have been fully read. + +Interval based checkpointing can be implemented by setting the `Stream.state_checkpoint_interval` property e.g: + +```text +class MyAmazingStream(Stream): + # Save the state every 100 records + state_checkpoint_interval = 100 +``` ### `Stream.stream_slices` -The above methods can optionally be paired with the `stream_slices` function to granularly control exactly when state is saved. Conceptually, a Stream Slice is a subset of the records in a stream which represent the smallest unit of data which can be re-synced. Once a full slice is read, an `AirbyteStateMessage` will be output, causing state to be saved. If a connector fails while reading the Nth slice of a stream, then the next time it retries, it will begin reading at the beginning of the Nth slice again, rather than re-read slices `1...N-1`. -A Slice object is not typed, and the developer is free to include any information necessary to make the request. This function is called when the `Stream` is about to be read. Typically, the `stream_slices` function, via inspecting the state object, -generates a Slice for every request to be made. +Stream slices can be used to achieve finer grain control of when state is checkpointed. -As an example, suppose an API is able to dispense data hourly. If the last sync was exactly 24 hours ago, -we can either make an API call retrieving all data at once, or make 24 calls each retrieving an hour's -worth of data. In the latter case, the `stream_slices` function, sees that the previous state contains -yesterday's timestamp, and returns a list of 24 Slices, each with a different hourly timestamp to be -used when creating request. If the stream fails halfway through (at the 12th slice), then the next time it starts reading, it will read from the beginning of the 12th slice. +Conceptually, a Stream Slice is a subset of the records in a stream which represent the smallest unit of data which can be re-synced. Once a full slice is read, an `AirbyteStateMessage` will be output, causing state to be saved. If a connector fails while reading the Nth slice of a stream, then the next time it retries, it will begin reading at the beginning of the Nth slice again, rather than re-read slices `1...N-1`. -For a more in-depth description of stream slicing, see the [Stream Slices guide](./stream_slices.md). +A Slice object is not typed, and the developer is free to include any information necessary to make the request. This function is called when the `Stream` is about to be read. Typically, the `stream_slices` function, via inspecting the state object, generates a Slice for every request to be made. + +As an example, suppose an API is able to dispense data hourly. If the last sync was exactly 24 hours ago, we can either make an API call retrieving all data at once, or make 24 calls each retrieving an hour's worth of data. In the latter case, the `stream_slices` function, sees that the previous state contains yesterday's timestamp, and returns a list of 24 Slices, each with a different hourly timestamp to be used when creating request. If the stream fails halfway through \(at the 12th slice\), then the next time it starts reading, it will read from the beginning of the 12th slice. + +For a more in-depth description of stream slicing, see the [Stream Slices guide](https://github.com/airbytehq/airbyte/tree/8500fef4133d3d06e16e8b600d65ebf2c58afefd/docs/connector-development/cdk-python/stream-slices.md). + +## Conclusion -## Conclusion In summary, an incremental stream requires: + * the `cursor_field` property -* the `get_updated_state` function +* to be inherited from `IncrementalMixin` and state methods implemented * Optionally, the `stream_slices` function + diff --git a/airbyte-cdk/python/docs/tutorials/cdk-tutorial-python-http/6-read-data.md b/airbyte-cdk/python/docs/tutorials/cdk-tutorial-python-http/6-read-data.md index 9bf038268a3ed..5a085d9b8db51 100644 --- a/airbyte-cdk/python/docs/tutorials/cdk-tutorial-python-http/6-read-data.md +++ b/airbyte-cdk/python/docs/tutorials/cdk-tutorial-python-http/6-read-data.md @@ -24,9 +24,9 @@ Optionally, we can provide additional inputs to customize requests: Backoff policy options: -- `retry_factor` Specifies factor for exponential backoff policy (by default is 5) -- `max_retries` Specifies maximum amount of retries for backoff policy (by default is 5) -- `raise_on_http_errors` If set to False, allows opting-out of raising HTTP code exception (by default is True) +* `retry_factor` Specifies factor for exponential backoff policy \(by default is 5\) +* `max_retries` Specifies maximum amount of retries for backoff policy \(by default is 5\) +* `raise_on_http_errors` If set to False, allows opting-out of raising HTTP code exception \(by default is True\) There are many other customizable options - you can find them in the [`airbyte_cdk.sources.streams.http.HttpStream`](https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py) class. @@ -37,9 +37,9 @@ Let's begin by pulling data for the last day's rates by using the `/latest` endp ```python class ExchangeRates(HttpStream): url_base = "https://api.exchangeratesapi.io/" - + primary_key = None - + def __init__(self, base: str, **kwargs): super().__init__() self.base = base @@ -85,7 +85,7 @@ This may look big, but that's just because there are lots of \(unused, for now\) Let's also pass the `base` parameter input by the user to the stream class: ```python -def streams(self, config: Mapping[str, Any]) -> List[Stream]: + def streams(self, config: Mapping[str, Any]) -> List[Stream]: auth = NoAuth() return [ExchangeRates(authenticator=auth, base=config['base'])] ``` @@ -110,7 +110,14 @@ We theoretically _could_ stop here and call it a connector. But let's give addin ## Adding incremental sync -To add incremental sync, we'll do a few things: 1. Pass the `start_date` param input by the user into the stream. 2. Declare the stream's `cursor_field`. 3. Implement the `get_updated_state` method. 4. Implement the `stream_slices` method. 5. Update the `path` method to specify the date to pull exchange rates for. 6. Update the configured catalog to use `incremental` sync when we're testing the stream. +To add incremental sync, we'll do a few things: +1. Pass the `start_date` param input by the user into the stream. +2. Declare the stream's `cursor_field`. +3. Declare the stream's property `_cursor_value` to hold the state value +4. Add `IncrementalMixin` to the list of the ancestors of the stream and implement setter and getter of the `state`. +5. Implement the `stream_slices` method. +6. Update the `path` method to specify the date to pull exchange rates for. +7. Update the configured catalog to use `incremental` sync when we're testing the stream. We'll describe what each of these methods do below. Before we begin, it may help to familiarize yourself with how incremental sync works in Airbyte by reading the [docs on incremental](https://docs.airbyte.io/architecture/connections/incremental-append). @@ -132,7 +139,7 @@ Let's also add this parameter to the constructor and declare the `cursor_field`: from datetime import datetime, timedelta -class ExchangeRates(HttpStream): +class ExchangeRates(HttpStream, IncrementalMixin): url_base = "https://api.exchangeratesapi.io/" cursor_field = "date" primary_key = "date" @@ -141,24 +148,38 @@ class ExchangeRates(HttpStream): super().__init__() self.base = base self.start_date = start_date + self._cursor_value = None ``` Declaring the `cursor_field` informs the framework that this stream now supports incremental sync. The next time you run `python main_dev.py discover --config sample_files/config.json` you'll find that the `supported_sync_modes` field now also contains `incremental`. But we're not quite done with supporting incremental, we have to actually emit state! We'll structure our state object very simply: it will be a `dict` whose single key is `'date'` and value is the date of the last day we synced data from. For example, `{'date': '2021-04-26'}` indicates the connector previously read data up until April 26th and therefore shouldn't re-read anything before April 26th. -Let's do this by implementing the `get_updated_state` method inside the `ExchangeRates` class. +Let's do this by implementing the getter and setter for the `state` inside the `ExchangeRates` class. ```python - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]: - # This method is called once for each record returned from the API to compare the cursor field value in that record with the current state - # we then return an updated state object. If this is the first time we run a sync or no state was passed, current_stream_state will be None. - if current_stream_state is not None and 'date' in current_stream_state: - current_parsed_date = datetime.strptime(current_stream_state['date'], '%Y-%m-%d') - latest_record_date = datetime.strptime(latest_record['date'], '%Y-%m-%d') - return {'date': max(current_parsed_date, latest_record_date).strftime('%Y-%m-%d')} + @property + def state(self) -> Mapping[str, Any]: + if self._cursor_value: + return {self.cursor_field: self._cursor_value.strftime('%Y-%m-%d')} else: - return {'date': self.start_date.strftime('%Y-%m-%d')} + return {self.cursor_field: self.start_date.strftime('%Y-%m-%d')} + + @state.setter + def state(self, value: Mapping[str, Any]): + self._cursor_value = datetime.strptime(value[self.cursor_field], '%Y-%m-%d') +``` + +Update internal state `cursor_value` inside `read_records` method + +```python + def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: + for record in super().read_records(*args, **kwargs): + if self._cursor_value: + latest_record_date = datetime.strptime(latest_record[self.cursor_field], '%Y-%m-%d') + self._cursor_value = max(self._cursor_value, latest_record_date) + yield record + ``` This implementation compares the date from the latest record with the date in the current state and takes the maximum as the "new" state object. @@ -166,20 +187,19 @@ This implementation compares the date from the latest record with the date in th We'll implement the `stream_slices` method to return a list of the dates for which we should pull data based on the stream state if it exists: ```python - def _chunk_date_range(self, start_date: datetime) -> List[Mapping[str, any]]: + def _chunk_date_range(self, start_date: datetime) -> List[Mapping[str, Any]]: """ Returns a list of each day between the start date and now. The return value is a list of dicts {'date': date_string}. """ dates = [] while start_date < datetime.now(): - dates.append({'date': start_date.strftime('%Y-%m-%d')}) + dates.append({self.cursor_field: start_date.strftime('%Y-%m-%d')}) start_date += timedelta(days=1) return dates - def stream_slices(self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[ - Optional[Mapping[str, any]]]: - start_date = datetime.strptime(stream_state['date'], '%Y-%m-%d') if stream_state and 'date' in stream_state else self.start_date + def stream_slices(self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]: + start_date = datetime.strptime(stream_state[self.cursor_field], '%Y-%m-%d') if stream_state and self.cursor_field in stream_state else self.start_date return self._chunk_date_range(start_date) ``` @@ -222,3 +242,4 @@ You should see that only the record from the last date is being synced! This is With that, we've implemented incremental sync for our connector! +```` diff --git a/airbyte-cdk/python/docs/tutorials/http_api_source.md b/airbyte-cdk/python/docs/tutorials/http_api_source.md index 400de7fad0e32..6d305b1048764 100644 --- a/airbyte-cdk/python/docs/tutorials/http_api_source.md +++ b/airbyte-cdk/python/docs/tutorials/http_api_source.md @@ -396,13 +396,15 @@ There we have it - a stream which reads data in just a few lines of code! We theoretically _could_ stop here and call it a connector. But let's give adding incremental sync a shot. #### Adding incremental sync + To add incremental sync, we'll do a few things: -1. Pass the `start_date` param input by the user into the stream. -2. Declare the stream's `cursor_field`. -3. Implement the `get_updated_state` method. -4. Implement the `stream_slices` method. -5. Update the `path` method to specify the date to pull exchange rates for. -6. Update the configured catalog to use `incremental` sync when we're testing the stream. +1. Pass the `start_date` param input by the user into the stream. +2. Declare the stream's `cursor_field`. +3. Declare the stream's property `_cursor_value` to hold the state value +4. Add `IncrementalMixin` to the list of the ancestors of the stream and implement setter and getter of the `state`. +5. Implement the `stream_slices` method. +6. Update the `path` method to specify the date to pull exchange rates for. +7. Update the configured catalog to use `incremental` sync when we're testing the stream. We'll describe what each of these methods do below. Before we begin, it may help to familiarize yourself with how incremental sync works in Airbyte by reading the [docs on incremental](https://docs.airbyte.io/architecture/connections/incremental-append). @@ -424,7 +426,7 @@ Let's also add this parameter to the constructor and declare the `cursor_field`: from datetime import datetime, timedelta -class ExchangeRates(HttpStream): +class ExchangeRates(HttpStream, IncrementalMixin): url_base = "https://api.exchangeratesapi.io/" cursor_field = "date" @@ -432,25 +434,39 @@ class ExchangeRates(HttpStream): super().__init__() self.base = base self.start_date = start_date + self._cursor_value = None ``` Declaring the `cursor_field` informs the framework that this stream now supports incremental sync. The next time you run `python main_dev.py discover --config sample_files/config.json` you'll find that the `supported_sync_modes` field now also contains `incremental`. But we're not quite done with supporting incremental, we have to actually emit state! We'll structure our state object very simply: it will be a `dict` whose single key is `'date'` and value is the date of the last day we synced data from. For example, `{'date': '2021-04-26'}` indicates the connector previously read data up until April 26th and therefore shouldn't re-read anything before April 26th. -Let's do this by implementing the `get_updated_state` method inside the `ExchangeRates` class. +Let's do this by implementing the getter and setter for the `state` inside the `ExchangeRates` class. ```python - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]: - # This method is called once for each record returned from the API to compare the cursor field value in that record with the current state - # we then return an updated state object. If this is the first time we run a sync or no state was passed, current_stream_state will be None. - if current_stream_state is not None and 'date' in current_stream_state: - current_parsed_date = datetime.strptime(current_stream_state['date'], '%Y-%m-%d') - latest_record_date = datetime.strptime(latest_record['date'], '%Y-%m-%d') - return {'date': max(current_parsed_date, latest_record_date).strftime('%Y-%m-%d')} + @property + def state(self) -> Mapping[str, Any]: + if self._cursor_value: + return {self.cursor_field: self._cursor_value.strftime('%Y-%m-%d')} else: - return {'date': self.start_date.strftime('%Y-%m-%d')} -``` + return {self.cursor_field: self.start_date.strftime('%Y-%m-%d')} + + @state.setter + def state(self, value: Mapping[str, Any]): + self._cursor_value = datetime.strptime(value[self.cursor_field], '%Y-%m-%d') +``` + +Update internal state `cursor_value` inside `read_records` method + +```python + def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: + for record in super().read_records(*args, **kwargs): + if self._cursor_value: + latest_record_date = datetime.strptime(latest_record[self.cursor_field], '%Y-%m-%d') + self._cursor_value = max(self._cursor_value, latest_record_date) + yield record + +``` This implementation compares the date from the latest record with the date in the current state and takes the maximum as the "new" state object. diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index e67332eca7694..03cabbd73d9c1 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.48", + version="0.1.49", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", @@ -46,7 +46,7 @@ packages=find_packages(exclude=("unit_tests",)), install_requires=[ "backoff", - "dpath==2.0.1", + "dpath~=2.0.1", "jsonschema~=3.2.0", "jsonref~=0.2", "pendulum", @@ -59,7 +59,14 @@ ], python_requires=">=3.7.0", extras_require={ - "dev": ["MyPy~=0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock", "pytest-httpserver"], + "dev": [ + "MyPy~=0.812", + "pytest", + "pytest-cov", + "pytest-mock", + "requests-mock", + "pytest-httpserver", + ], "sphinx-docs": [ "Sphinx~=4.2", "sphinx-rtd-theme~=1.0", diff --git a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py index 703504598c61b..d6b2e6c106002 100644 --- a/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py +++ b/airbyte-cdk/python/unit_tests/sources/test_abstract_source.py @@ -5,6 +5,7 @@ import logging from collections import defaultdict from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Union +from unittest.mock import call import pytest from airbyte_cdk.models import ( @@ -28,7 +29,11 @@ class MockSource(AbstractSource): - def __init__(self, check_lambda: Callable[[], Tuple[bool, Optional[Any]]] = None, streams: List[Stream] = None): + def __init__( + self, + check_lambda: Callable[[], Tuple[bool, Optional[Any]]] = None, + streams: List[Stream] = None, + ): self._streams = streams self.check_lambda = check_lambda @@ -56,13 +61,17 @@ def test_failed_check(): def test_raising_check(): - """Tests that if a source raises an unexpected exception the connection check the appropriate connectionStatus failure message is returned""" + """Tests that if a source raises an unexpected exception the appropriate connectionStatus failure message is returned.""" expected = AirbyteConnectionStatus(status=Status.FAILED, message="Exception('this should fail')") assert expected == MockSource(check_lambda=lambda: exec('raise Exception("this should fail")')).check(logger, {}) class MockStream(Stream): - def __init__(self, inputs_and_mocked_outputs: List[Tuple[Mapping[str, Any], Iterable[Mapping[str, Any]]]] = None, name: str = None): + def __init__( + self, + inputs_and_mocked_outputs: List[Tuple[Mapping[str, Any], Iterable[Mapping[str, Any]]]] = None, + name: str = None, + ): self._inputs_and_mocked_outputs = inputs_and_mocked_outputs self._name = name @@ -85,6 +94,18 @@ def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: return "pk" +class MockStreamWithState(MockStream): + cursor_field = "cursor" + + @property + def state(self): + return {} + + @state.setter + def state(self, value): + pass + + def test_discover(mocker): """Tests that the appropriate AirbyteCatalog is returned from the discover method""" airbyte_stream1 = AirbyteStream( @@ -125,7 +146,10 @@ def test_read_nonexistent_stream_raises_exception(mocker): def _as_record(stream: str, data: Dict[str, Any]) -> AirbyteMessage: - return AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream=stream, data=data, emitted_at=GLOBAL_EMITTED_AT)) + return AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage(stream=stream, data=data, emitted_at=GLOBAL_EMITTED_AT), + ) def _as_records(stream: str, data: List[Dict[str, Any]]) -> List[AirbyteMessage]: @@ -134,7 +158,9 @@ def _as_records(stream: str, data: List[Dict[str, Any]]) -> List[AirbyteMessage] def _configured_stream(stream: Stream, sync_mode: SyncMode): return ConfiguredAirbyteStream( - stream=stream.as_airbyte_stream(), sync_mode=sync_mode, destination_sync_mode=DestinationSyncMode.overwrite + stream=stream.as_airbyte_stream(), + sync_mode=sync_mode, + destination_sync_mode=DestinationSyncMode.overwrite, ) @@ -155,7 +181,10 @@ def test_valid_full_refresh_read_no_slices(mocker): src = MockSource(streams=[s1, s2]) catalog = ConfiguredAirbyteCatalog( - streams=[_configured_stream(s1, SyncMode.full_refresh), _configured_stream(s2, SyncMode.full_refresh)] + streams=[ + _configured_stream(s1, SyncMode.full_refresh), + _configured_stream(s2, SyncMode.full_refresh), + ] ) expected = _as_records("s1", stream_output) + _as_records("s2", stream_output) @@ -168,15 +197,24 @@ def test_valid_full_refresh_read_with_slices(mocker): """Tests that running a full refresh sync on streams which use slices produces the expected AirbyteMessages""" slices = [{"1": "1"}, {"2": "2"}] # When attempting to sync a slice, just output that slice as a record - s1 = MockStream([({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices], name="s1") - s2 = MockStream([({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices], name="s2") + s1 = MockStream( + [({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices], + name="s1", + ) + s2 = MockStream( + [({"sync_mode": SyncMode.full_refresh, "stream_slice": s}, [s]) for s in slices], + name="s2", + ) mocker.patch.object(MockStream, "get_json_schema", return_value={}) mocker.patch.object(MockStream, "stream_slices", return_value=slices) src = MockSource(streams=[s1, s2]) catalog = ConfiguredAirbyteCatalog( - streams=[_configured_stream(s1, SyncMode.full_refresh), _configured_stream(s2, SyncMode.full_refresh)] + streams=[ + _configured_stream(s1, SyncMode.full_refresh), + _configured_stream(s2, SyncMode.full_refresh), + ] ) expected = [*_as_records("s1", slices), *_as_records("s2", slices)] @@ -190,155 +228,293 @@ def _state(state_data: Dict[str, Any]): return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=state_data)) -def test_valid_incremental_read_with_checkpoint_interval(mocker): - """Tests that an incremental read which doesn't specify a checkpoint interval outputs a STATE message after reading N records within a stream""" - stream_output = [{"k1": "v1"}, {"k2": "v2"}] - s1 = MockStream([({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], name="s1") - s2 = MockStream([({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], name="s2") - state = {"cursor": "value"} - mocker.patch.object(MockStream, "get_updated_state", return_value=state) - mocker.patch.object(MockStream, "supports_incremental", return_value=True) - mocker.patch.object(MockStream, "get_json_schema", return_value={}) - # Tell the source to output one state message per record - mocker.patch.object(MockStream, "state_checkpoint_interval", new_callable=mocker.PropertyMock, return_value=1) - - src = MockSource(streams=[s1, s2]) - catalog = ConfiguredAirbyteCatalog(streams=[_configured_stream(s1, SyncMode.incremental), _configured_stream(s2, SyncMode.incremental)]) - - expected = [ - _as_record("s1", stream_output[0]), - _state({"s1": state}), - _as_record("s1", stream_output[1]), - _state({"s1": state}), - _state({"s1": state}), - _as_record("s2", stream_output[0]), - _state({"s1": state, "s2": state}), - _as_record("s2", stream_output[1]), - _state({"s1": state, "s2": state}), - _state({"s1": state, "s2": state}), - ] - messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict)))) - - assert expected == messages - - -def test_valid_incremental_read_with_no_interval(mocker): - """Tests that an incremental read which doesn't specify a checkpoint interval outputs a STATE message only after fully reading the stream and does - not output any STATE messages during syncing the stream.""" - stream_output = [{"k1": "v1"}, {"k2": "v2"}] - s1 = MockStream([({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], name="s1") - s2 = MockStream([({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], name="s2") - state = {"cursor": "value"} - mocker.patch.object(MockStream, "get_updated_state", return_value=state) - mocker.patch.object(MockStream, "supports_incremental", return_value=True) - mocker.patch.object(MockStream, "get_json_schema", return_value={}) - - src = MockSource(streams=[s1, s2]) - catalog = ConfiguredAirbyteCatalog(streams=[_configured_stream(s1, SyncMode.incremental), _configured_stream(s2, SyncMode.incremental)]) - - expected = [ - *_as_records("s1", stream_output), - _state({"s1": state}), - *_as_records("s2", stream_output), - _state({"s1": state, "s2": state}), - ] - - messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict)))) - - assert expected == messages - - -def test_valid_incremental_read_with_slices(mocker): - """Tests that an incremental read which uses slices outputs each record in the slice followed by a STATE message, for each slice""" - slices = [{"1": "1"}, {"2": "2"}] - stream_output = [{"k1": "v1"}, {"k2": "v2"}, {"k3": "v3"}] - s1 = MockStream( - [({"sync_mode": SyncMode.incremental, "stream_slice": s, "stream_state": mocker.ANY}, stream_output) for s in slices], name="s1" - ) - s2 = MockStream( - [({"sync_mode": SyncMode.incremental, "stream_slice": s, "stream_state": mocker.ANY}, stream_output) for s in slices], name="s2" - ) - state = {"cursor": "value"} - mocker.patch.object(MockStream, "get_updated_state", return_value=state) - mocker.patch.object(MockStream, "supports_incremental", return_value=True) - mocker.patch.object(MockStream, "get_json_schema", return_value={}) - mocker.patch.object(MockStream, "stream_slices", return_value=slices) - - src = MockSource(streams=[s1, s2]) - catalog = ConfiguredAirbyteCatalog(streams=[_configured_stream(s1, SyncMode.incremental), _configured_stream(s2, SyncMode.incremental)]) - - expected = [ - # stream 1 slice 1 - *_as_records("s1", stream_output), - _state({"s1": state}), - # stream 1 slice 2 - *_as_records("s1", stream_output), - _state({"s1": state}), - # stream 2 slice 1 - *_as_records("s2", stream_output), - _state({"s1": state, "s2": state}), - # stream 2 slice 2 - *_as_records("s2", stream_output), - _state({"s1": state, "s2": state}), - ] - - messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict)))) - - assert expected == messages - - -def test_valid_incremental_read_with_slices_and_interval(mocker): - """ - Tests that an incremental read which uses slices and a checkpoint interval: - 1. outputs all records - 2. outputs a state message every N records (N=checkpoint_interval) - 3. outputs a state message after reading the entire slice - """ - slices = [{"1": "1"}, {"2": "2"}] - stream_output = [{"k1": "v1"}, {"k2": "v2"}, {"k3": "v3"}] - s1 = MockStream( - [({"sync_mode": SyncMode.incremental, "stream_slice": s, "stream_state": mocker.ANY}, stream_output) for s in slices], name="s1" - ) - s2 = MockStream( - [({"sync_mode": SyncMode.incremental, "stream_slice": s, "stream_state": mocker.ANY}, stream_output) for s in slices], name="s2" - ) - state = {"cursor": "value"} - mocker.patch.object(MockStream, "get_updated_state", return_value=state) - mocker.patch.object(MockStream, "supports_incremental", return_value=True) - mocker.patch.object(MockStream, "get_json_schema", return_value={}) - mocker.patch.object(MockStream, "stream_slices", return_value=slices) - mocker.patch.object(MockStream, "state_checkpoint_interval", new_callable=mocker.PropertyMock, return_value=2) - - src = MockSource(streams=[s1, s2]) - catalog = ConfiguredAirbyteCatalog(streams=[_configured_stream(s1, SyncMode.incremental), _configured_stream(s2, SyncMode.incremental)]) - - expected = [ - # stream 1 slice 1 - _as_record("s1", stream_output[0]), - _as_record("s1", stream_output[1]), - _state({"s1": state}), - _as_record("s1", stream_output[2]), - _state({"s1": state}), - # stream 1 slice 2 - _as_record("s1", stream_output[0]), - _as_record("s1", stream_output[1]), - _state({"s1": state}), - _as_record("s1", stream_output[2]), - _state({"s1": state}), - # stream 2 slice 1 - _as_record("s2", stream_output[0]), - _as_record("s2", stream_output[1]), - _state({"s1": state, "s2": state}), - _as_record("s2", stream_output[2]), - _state({"s1": state, "s2": state}), - # stream 2 slice 2 - _as_record("s2", stream_output[0]), - _as_record("s2", stream_output[1]), - _state({"s1": state, "s2": state}), - _as_record("s2", stream_output[2]), - _state({"s1": state, "s2": state}), - ] - - messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict)))) - - assert expected == messages +class TestIncrementalRead: + def test_with_state_attribute(self, mocker): + """Test correct state passing for the streams that have a state attribute""" + stream_output = [{"k1": "v1"}, {"k2": "v2"}] + old_state = {"cursor": "old_value"} + new_state = {"cursor": "new_value"} + s1 = MockStreamWithState( + [ + ( + {"sync_mode": SyncMode.incremental, "stream_state": old_state}, + stream_output, + ) + ], + name="s1", + ) + s2 = MockStreamWithState( + [({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], + name="s2", + ) + mocker.patch.object(MockStreamWithState, "get_updated_state", return_value={}) + state_property = mocker.patch.object( + MockStreamWithState, + "state", + new_callable=mocker.PropertyMock, + return_value=new_state, + ) + mocker.patch.object(MockStreamWithState, "get_json_schema", return_value={}) + src = MockSource(streams=[s1, s2]) + catalog = ConfiguredAirbyteCatalog( + streams=[ + _configured_stream(s1, SyncMode.incremental), + _configured_stream(s2, SyncMode.incremental), + ] + ) + + expected = [ + _as_record("s1", stream_output[0]), + _as_record("s1", stream_output[1]), + _state({"s1": new_state}), + _as_record("s2", stream_output[0]), + _as_record("s2", stream_output[1]), + _state({"s1": new_state, "s2": new_state}), + ] + messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state={"s1": old_state}))) + + assert expected == messages + assert state_property.mock_calls == [ + call(old_state), # set state for s1 + call(), # get state in the end of slice for s1 + call(), # get state in the end of slice for s2 + ] + + def test_with_checkpoint_interval(self, mocker): + """Tests that an incremental read which doesn't specify a checkpoint interval outputs a STATE message + after reading N records within a stream. + """ + stream_output = [{"k1": "v1"}, {"k2": "v2"}] + s1 = MockStream( + [({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], + name="s1", + ) + s2 = MockStream( + [({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], + name="s2", + ) + state = {"cursor": "value"} + mocker.patch.object(MockStream, "get_updated_state", return_value=state) + mocker.patch.object(MockStream, "supports_incremental", return_value=True) + mocker.patch.object(MockStream, "get_json_schema", return_value={}) + # Tell the source to output one state message per record + mocker.patch.object( + MockStream, + "state_checkpoint_interval", + new_callable=mocker.PropertyMock, + return_value=1, + ) + + src = MockSource(streams=[s1, s2]) + catalog = ConfiguredAirbyteCatalog( + streams=[ + _configured_stream(s1, SyncMode.incremental), + _configured_stream(s2, SyncMode.incremental), + ] + ) + + expected = [ + _as_record("s1", stream_output[0]), + _state({"s1": state}), + _as_record("s1", stream_output[1]), + _state({"s1": state}), + _state({"s1": state}), + _as_record("s2", stream_output[0]), + _state({"s1": state, "s2": state}), + _as_record("s2", stream_output[1]), + _state({"s1": state, "s2": state}), + _state({"s1": state, "s2": state}), + ] + messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict)))) + + assert expected == messages + + def test_with_no_interval(self, mocker): + """Tests that an incremental read which doesn't specify a checkpoint interval outputs + a STATE message only after fully reading the stream and does not output any STATE messages during syncing the stream. + """ + stream_output = [{"k1": "v1"}, {"k2": "v2"}] + s1 = MockStream( + [({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], + name="s1", + ) + s2 = MockStream( + [({"sync_mode": SyncMode.incremental, "stream_state": {}}, stream_output)], + name="s2", + ) + state = {"cursor": "value"} + mocker.patch.object(MockStream, "get_updated_state", return_value=state) + mocker.patch.object(MockStream, "supports_incremental", return_value=True) + mocker.patch.object(MockStream, "get_json_schema", return_value={}) + + src = MockSource(streams=[s1, s2]) + catalog = ConfiguredAirbyteCatalog( + streams=[ + _configured_stream(s1, SyncMode.incremental), + _configured_stream(s2, SyncMode.incremental), + ] + ) + + expected = [ + *_as_records("s1", stream_output), + _state({"s1": state}), + *_as_records("s2", stream_output), + _state({"s1": state, "s2": state}), + ] + + messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict)))) + + assert expected == messages + + def test_with_slices(self, mocker): + """Tests that an incremental read which uses slices outputs each record in the slice followed by a STATE message, for each slice""" + slices = [{"1": "1"}, {"2": "2"}] + stream_output = [{"k1": "v1"}, {"k2": "v2"}, {"k3": "v3"}] + s1 = MockStream( + [ + ( + { + "sync_mode": SyncMode.incremental, + "stream_slice": s, + "stream_state": mocker.ANY, + }, + stream_output, + ) + for s in slices + ], + name="s1", + ) + s2 = MockStream( + [ + ( + { + "sync_mode": SyncMode.incremental, + "stream_slice": s, + "stream_state": mocker.ANY, + }, + stream_output, + ) + for s in slices + ], + name="s2", + ) + state = {"cursor": "value"} + mocker.patch.object(MockStream, "get_updated_state", return_value=state) + mocker.patch.object(MockStream, "supports_incremental", return_value=True) + mocker.patch.object(MockStream, "get_json_schema", return_value={}) + mocker.patch.object(MockStream, "stream_slices", return_value=slices) + + src = MockSource(streams=[s1, s2]) + catalog = ConfiguredAirbyteCatalog( + streams=[ + _configured_stream(s1, SyncMode.incremental), + _configured_stream(s2, SyncMode.incremental), + ] + ) + + expected = [ + # stream 1 slice 1 + *_as_records("s1", stream_output), + _state({"s1": state}), + # stream 1 slice 2 + *_as_records("s1", stream_output), + _state({"s1": state}), + # stream 2 slice 1 + *_as_records("s2", stream_output), + _state({"s1": state, "s2": state}), + # stream 2 slice 2 + *_as_records("s2", stream_output), + _state({"s1": state, "s2": state}), + ] + + messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict)))) + + assert expected == messages + + def test_with_slices_and_interval(self, mocker): + """ + Tests that an incremental read which uses slices and a checkpoint interval: + 1. outputs all records + 2. outputs a state message every N records (N=checkpoint_interval) + 3. outputs a state message after reading the entire slice + """ + slices = [{"1": "1"}, {"2": "2"}] + stream_output = [{"k1": "v1"}, {"k2": "v2"}, {"k3": "v3"}] + s1 = MockStream( + [ + ( + { + "sync_mode": SyncMode.incremental, + "stream_slice": s, + "stream_state": mocker.ANY, + }, + stream_output, + ) + for s in slices + ], + name="s1", + ) + s2 = MockStream( + [ + ( + { + "sync_mode": SyncMode.incremental, + "stream_slice": s, + "stream_state": mocker.ANY, + }, + stream_output, + ) + for s in slices + ], + name="s2", + ) + state = {"cursor": "value"} + mocker.patch.object(MockStream, "get_updated_state", return_value=state) + mocker.patch.object(MockStream, "supports_incremental", return_value=True) + mocker.patch.object(MockStream, "get_json_schema", return_value={}) + mocker.patch.object(MockStream, "stream_slices", return_value=slices) + mocker.patch.object( + MockStream, + "state_checkpoint_interval", + new_callable=mocker.PropertyMock, + return_value=2, + ) + + src = MockSource(streams=[s1, s2]) + catalog = ConfiguredAirbyteCatalog( + streams=[ + _configured_stream(s1, SyncMode.incremental), + _configured_stream(s2, SyncMode.incremental), + ] + ) + + expected = [ + # stream 1 slice 1 + _as_record("s1", stream_output[0]), + _as_record("s1", stream_output[1]), + _state({"s1": state}), + _as_record("s1", stream_output[2]), + _state({"s1": state}), + # stream 1 slice 2 + _as_record("s1", stream_output[0]), + _as_record("s1", stream_output[1]), + _state({"s1": state}), + _as_record("s1", stream_output[2]), + _state({"s1": state}), + # stream 2 slice 1 + _as_record("s2", stream_output[0]), + _as_record("s2", stream_output[1]), + _state({"s1": state, "s2": state}), + _as_record("s2", stream_output[2]), + _state({"s1": state, "s2": state}), + # stream 2 slice 2 + _as_record("s2", stream_output[0]), + _as_record("s2", stream_output[1]), + _state({"s1": state, "s2": state}), + _as_record("s2", stream_output[2]), + _state({"s1": state, "s2": state}), + ] + + messages = _fix_emitted_at(list(src.read(logger, {}, catalog, state=defaultdict(dict)))) + + assert expected == messages diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json index 96926712db7ae..d0f98b17f8577 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json @@ -2,37 +2,7 @@ "streams": [ { "stream": { - "name": "campaigns", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updated_time"], - "source_defined_primary_key": [["id"]], - "namespace": null - }, - "sync_mode": "incremental", - "cursor_field": null, - "destination_sync_mode": "append", - "primary_key": null - }, - { - "stream": { - "name": "ad_sets", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updated_time"], - "source_defined_primary_key": [["id"]], - "namespace": null - }, - "sync_mode": "incremental", - "cursor_field": null, - "destination_sync_mode": "append", - "primary_key": null - }, - { - "stream": { - "name": "ads", + "name": "images", "json_schema": {}, "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_cursor": true, diff --git a/docs/connector-development/cdk-python/incremental-stream.md b/docs/connector-development/cdk-python/incremental-stream.md index 9acce80890ec0..b9db5260e0c9c 100644 --- a/docs/connector-development/cdk-python/incremental-stream.md +++ b/docs/connector-development/cdk-python/incremental-stream.md @@ -6,7 +6,8 @@ Several new pieces are essential to understand how incrementality works with the * `AirbyteStateMessage` * cursor fields -* `Stream.get_updated_state` +* `IncrementalMixin` +* `Stream.get_updated_state` (deprecated) as well as a few other optional concepts. @@ -22,13 +23,46 @@ Cursor fields can be input by the user \(e.g: a user can choose to use an auto-i In the context of the CDK, setting the `Stream.cursor_field` property to any truthy value informs the framework that this stream is incremental. +### `IncrementalMixin` + +This class mixin adds property `state` with abstract setter and getter. +The `state` attribute helps the CDK figure out the current state of sync at any moment (in contrast to deprecated `Stream.get_updated_state` method). +The setter typically deserialize state saved by CDK and initialize internal state of the stream. +The getter should serialize internal state of the stream. + +```python +@property +def state(self) -> Mapping[str, Any]: + return {self.cursor_field: str(self._cursor_value)} + +@state.setter +def state(self, value: Mapping[str, Any]): + self._cursor_value = value[self.cursor_field] +``` + +The actual logic of updating state during reading is implemented somewhere else, usually as part of `read_records` method, right after the latest record returned that matches the new state. +Therefore, the state represents the latest checkpoint successfully achieved, and all next records should match the next state after that one. +```python +def read_records(self, ...): + ... + yield record + yield record + yield record + self._cursor_value = max(record[self.cursor_field], self._cursor_value) + yield record + yield record + yield record + self._cursor_value = max(record[self.cursor_field], self._cursor_value) +``` + ### `Stream.get_updated_state` +(deprecated since 1.48.0, see `IncrementalMixin`) This function helps the stream keep track of the latest state by inspecting every record output by the stream \(as returned by the `Stream.read_records` method\) and comparing it against the most recent state object. This allows sync to resume from where the previous sync last stopped, regardless of success or failure. This function typically compares the state object's and the latest record's cursor field, picking the latest one. ## Checkpointing state -There are two ways to checkpointing state \(i.e: controling the timing of when state is saved\) while reading data from a connector: +There are two ways to checkpointing state \(i.e: controlling the timing of when state is saved\) while reading data from a connector: 1. Interval-based checkpointing 2. Stream Slices @@ -64,6 +98,6 @@ For a more in-depth description of stream slicing, see the [Stream Slices guide] In summary, an incremental stream requires: * the `cursor_field` property -* the `get_updated_state` function +* to be inherited from `IncrementalMixin` and state methods implemented * Optionally, the `stream_slices` function diff --git a/docs/connector-development/tutorials/cdk-tutorial-python-http/6-read-data.md b/docs/connector-development/tutorials/cdk-tutorial-python-http/6-read-data.md index 5db52bf54f33a..fea5a938527ef 100644 --- a/docs/connector-development/tutorials/cdk-tutorial-python-http/6-read-data.md +++ b/docs/connector-development/tutorials/cdk-tutorial-python-http/6-read-data.md @@ -110,7 +110,14 @@ We theoretically _could_ stop here and call it a connector. But let's give addin ## Adding incremental sync -To add incremental sync, we'll do a few things: 1. Pass the `start_date` param input by the user into the stream. 2. Declare the stream's `cursor_field`. 3. Implement the `get_updated_state` method. 4. Implement the `stream_slices` method. 5. Update the `path` method to specify the date to pull exchange rates for. 6. Update the configured catalog to use `incremental` sync when we're testing the stream. +To add incremental sync, we'll do a few things: +1. Pass the `start_date` param input by the user into the stream. +2. Declare the stream's `cursor_field`. +3. Declare the stream's property `_cursor_value` to hold the state value +4. Add `IncrementalMixin` to the list of the ancestors of the stream and implement setter and getter of the `state`. +5. Implement the `stream_slices` method. +6. Update the `path` method to specify the date to pull exchange rates for. +7. Update the configured catalog to use `incremental` sync when we're testing the stream. We'll describe what each of these methods do below. Before we begin, it may help to familiarize yourself with how incremental sync works in Airbyte by reading the [docs on incremental](../../../understanding-airbyte/connections/incremental-append.md). @@ -132,7 +139,7 @@ Let's also add this parameter to the constructor and declare the `cursor_field`: from datetime import datetime, timedelta -class ExchangeRates(HttpStream): +class ExchangeRates(HttpStream, IncrementalMixin): url_base = "https://api.exchangeratesapi.io/" cursor_field = "date" primary_key = "date" @@ -141,24 +148,38 @@ class ExchangeRates(HttpStream): super().__init__() self.base = base self.start_date = start_date + self._cursor_value = None ``` Declaring the `cursor_field` informs the framework that this stream now supports incremental sync. The next time you run `python main_dev.py discover --config sample_files/config.json` you'll find that the `supported_sync_modes` field now also contains `incremental`. But we're not quite done with supporting incremental, we have to actually emit state! We'll structure our state object very simply: it will be a `dict` whose single key is `'date'` and value is the date of the last day we synced data from. For example, `{'date': '2021-04-26'}` indicates the connector previously read data up until April 26th and therefore shouldn't re-read anything before April 26th. -Let's do this by implementing the `get_updated_state` method inside the `ExchangeRates` class. +Let's do this by implementing the getter and setter for the `state` inside the `ExchangeRates` class. ```python - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]: - # This method is called once for each record returned from the API to compare the cursor field value in that record with the current state - # we then return an updated state object. If this is the first time we run a sync or no state was passed, current_stream_state will be None. - if current_stream_state is not None and 'date' in current_stream_state: - current_parsed_date = datetime.strptime(current_stream_state['date'], '%Y-%m-%d') - latest_record_date = datetime.strptime(latest_record['date'], '%Y-%m-%d') - return {'date': max(current_parsed_date, latest_record_date).strftime('%Y-%m-%d')} + @property + def state(self) -> Mapping[str, Any]: + if self._cursor_value: + return {self.cursor_field: self._cursor_value.strftime('%Y-%m-%d')} else: - return {'date': self.start_date.strftime('%Y-%m-%d')} + return {self.cursor_field: self.start_date.strftime('%Y-%m-%d')} + + @state.setter + def state(self, value: Mapping[str, Any]): + self._cursor_value = datetime.strptime(value[self.cursor_field], '%Y-%m-%d') +``` + +Update internal state `cursor_value` inside `read_records` method + +```python + def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: + for record in super().read_records(*args, **kwargs): + if self._cursor_value: + latest_record_date = datetime.strptime(latest_record[self.cursor_field], '%Y-%m-%d') + self._cursor_value = max(self._cursor_value, latest_record_date) + yield record + ``` This implementation compares the date from the latest record with the date in the current state and takes the maximum as the "new" state object. @@ -166,20 +187,19 @@ This implementation compares the date from the latest record with the date in th We'll implement the `stream_slices` method to return a list of the dates for which we should pull data based on the stream state if it exists: ```python - def _chunk_date_range(self, start_date: datetime) -> List[Mapping[str, any]]: + def _chunk_date_range(self, start_date: datetime) -> List[Mapping[str, Any]]: """ Returns a list of each day between the start date and now. The return value is a list of dicts {'date': date_string}. """ dates = [] while start_date < datetime.now(): - dates.append({'date': start_date.strftime('%Y-%m-%d')}) + dates.append({self.cursor_field: start_date.strftime('%Y-%m-%d')}) start_date += timedelta(days=1) return dates - def stream_slices(self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[ - Optional[Mapping[str, any]]]: - start_date = datetime.strptime(stream_state['date'], '%Y-%m-%d') if stream_state and 'date' in stream_state else self.start_date + def stream_slices(self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]: + start_date = datetime.strptime(stream_state[self.cursor_field], '%Y-%m-%d') if stream_state and self.cursor_field in stream_state else self.start_date return self._chunk_date_range(start_date) ```