Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDK: add support for streams with state attribute #9746

Merged
merged 12 commits into from Feb 16, 2022
Merged

Conversation

keu
Copy link
Contributor

@keu keu commented Jan 24, 2022

What

Update AbstractSource to support streams that have the state attribute, use it to set and retrieve state.

How

Updated AbstractSource._read_incremental and AbstractSource._checkpoint_state to try first use state attribute and handle AttributeError if it doesn't exist.
Also fixed timer usage.

I didn't use hasattr because it will trigger property call.
I didn't use dir because it doesn't support dynamic attributes (if we will have any)

Recommended reading order

  1. abstract_source.py
  2. test_abstract_source.py

@keu keu linked an issue Jan 24, 2022 that may be closed by this pull request
@github-actions github-actions bot added the CDK Connector Development Kit label Jan 24, 2022
@keu keu temporarily deployed to more-secrets January 24, 2022 11:20 Inactive
@keu keu temporarily deployed to more-secrets January 24, 2022 11:23 Inactive
@keu keu self-assigned this Jan 24, 2022
@keu keu temporarily deployed to more-secrets January 24, 2022 11:35 Inactive
@keu keu temporarily deployed to more-secrets January 24, 2022 11:42 Inactive
@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@keu keu temporarily deployed to more-secrets January 24, 2022 12:07 Inactive
@keu keu temporarily deployed to more-secrets January 24, 2022 12:53 Inactive
@keu keu temporarily deployed to more-secrets January 24, 2022 13:23 Inactive
@sherifnada
Copy link
Contributor

@eugene-kulak what is the motivation for this change? How is it intended to be used? We also need to add docs for any such change to the CDK

@keu
Copy link
Contributor Author

keu commented Jan 25, 2022

@eugene-kulak what is the motivation for this change? How is it intended to be used? We also need to add docs for any such change to the CDK

@sherifnada the motivation partially explained in the ticket itself, this is rather an alpha version of this feature that doesn't require any change from Stream, but I will update docstrings. Do you think it is worth mention in the end user docs?

@keu keu temporarily deployed to more-secrets January 25, 2022 15:53 Inactive
@keu keu temporarily deployed to more-secrets January 26, 2022 02:53 Inactive
@keu keu requested a review from sherifnada January 26, 2022 18:03
@keu keu added this to the Connectors Jan 28 2022 milestone Jan 27, 2022
Copy link
Contributor

@sherifnada sherifnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One comment on maintaining an invariant

def _checkpoint_state(self, stream_name, stream_state, connector_state, logger):
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
connector_state[stream_name] = stream_state
def _checkpoint_state(self, stream, stream_state, connector_state):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@keu could we maintain the invariant that the state output from this method is always the same as stream.state? I think the only thing I'm concerned about is that if I define both stream.state and get_updated_state then there are two potentially different states floating around which will lead to confusing behavior.

Can we always maintain the invariant that whatever is stored in stream.state contains the state object being output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sherifnada I'm not sure how we can achieve this, _checkpoint_state will always return value from stream.state if there is any, if not it will fallback to the state obtained from get_updated_state,
so what is the problem here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the contract is IncrementalMixin implementation always takes precedence over get_updated_state? sounds fine w me.

Should we add this to the docs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do

@@ -20,6 +20,40 @@ def package_name_from_class(cls: object) -> str:
return module.__name__.split(".")[0]


class IncrementalMixin(ABC):
"""Mixing to make stream incremental.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Mixing to make stream incremental.
"""Mixin to make stream incremental.

@@ -137,7 +171,7 @@ def state_checkpoint_interval(self) -> Optional[int]:
return None

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's put a deprecation notice?

def _checkpoint_state(self, stream_name, stream_state, connector_state, logger):
logger.info(f"Setting state of {stream_name} stream to {stream_state}")
connector_state[stream_name] = stream_state
def _checkpoint_state(self, stream, stream_state, connector_state):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the contract is IncrementalMixin implementation always takes precedence over get_updated_state? sounds fine w me.

Should we add this to the docs?

@github-actions github-actions bot added area/connectors Connector related issues area/documentation Improvements or additions to documentation labels Feb 16, 2022
@keu
Copy link
Contributor Author

keu commented Feb 16, 2022

/publish-cdk dry-run=true

🕑 https://github.com/airbytehq/airbyte/actions/runs/1854932201
https://github.com/airbytehq/airbyte/actions/runs/1854932201

@keu keu temporarily deployed to more-secrets February 16, 2022 19:58 Inactive
@keu keu temporarily deployed to more-secrets February 16, 2022 19:58 Inactive
@codecov
Copy link

codecov bot commented Feb 16, 2022

Codecov Report

❗ No coverage uploaded for pull request base (master@3da09aa). Click here to learn what that means.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff            @@
##             master    #9746   +/-   ##
=========================================
  Coverage          ?   66.83%           
=========================================
  Files             ?        6           
  Lines             ?      603           
  Branches          ?        0           
=========================================
  Hits              ?      403           
  Misses            ?      200           
  Partials          ?        0           

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3da09aa...4ecfb24. Read the comment docs.

@keu
Copy link
Contributor Author

keu commented Feb 16, 2022

/publish-cdk dry-run=false

🕑 https://github.com/airbytehq/airbyte/actions/runs/1854976255
https://github.com/airbytehq/airbyte/actions/runs/1854976255

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CDK: Add support for streams with state attribute (state v2)
8 participants