Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Amazon SQS: New connector #6937

Merged
merged 34 commits into from
Oct 26, 2021
Merged

🎉 Source Amazon SQS: New connector #6937

merged 34 commits into from
Oct 26, 2021

Conversation

sdairs
Copy link
Contributor

@sdairs sdairs commented Oct 10, 2021

What

Adds an Amazon SQS Source connector
Has Unit tests for read/check/discover with mock AWS resources to simulate behaviour without needing to create a real SQS topic or IAM roles

airbytehq/connector-contest#68

How

Connector built with Python CDK
Uses python boto3 for AWS functionality
Uses python moto for mock-AWS resources

Recommended reading order

  1. x.java
  2. y.python

Pre-merge Checklist

Expand the relevant checklist and delete the others.

New Connector

Community member or Airbyter

  • Community member? Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • docs/SUMMARY.md
    • docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
    • docs/integrations/README.md
    • airbyte-integrations/builds.md
  • PR name follows PR naming conventions
  • Connector added to connector index like described here

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here

Updating a connector

Community member or Airbyter

  • Grant edit access to maintainers (instructions)
  • Secrets in the connector's spec are annotated with airbyte_secret
  • Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run ./gradlew :airbyte-integrations:connectors:<name>:integrationTest.
  • Code reviews completed
  • Documentation updated
    • Connector's README.md
    • Connector's bootstrap.md. See description and examples
    • Changelog updated in docs/integrations/<source or destination>/<name>.md including changelog. See changelog example
  • PR name follows PR naming conventions
  • Connector version bumped like described here

Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

  • Create a non-forked branch based on this PR and test the below items on it
  • Build is successful
  • Credentials added to Github CI. Instructions.
  • /test connector=connectors/<name> command is passing.
  • New Connector version released on Dockerhub by running the /publish command described here

Connector Generator

  • Issue acceptance criteria met
  • PR name follows PR naming conventions
  • If adding a new generator, add it to the list of scaffold modules being tested
  • The generator test modules (all connectors with -scaffold in their name) have been updated with the latest scaffold by running ./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates then checking in your changes
  • Documentation which references the generator is updated as needed.

@CLAassistant
Copy link

CLAassistant commented Oct 10, 2021

CLA assistant check
All committers have signed the CLA.

@github-actions github-actions bot added area/connectors Connector related issues area/documentation Improvements or additions to documentation labels Oct 10, 2021
@sdairs
Copy link
Contributor Author

sdairs commented Oct 10, 2021

Right now test_read integration test fails as there are no records to verify in the stream.

In the unit tests I am using moto to create mock AWS services, but I'm not sure how this works with the integration/acceptance testing method. All other standard tests pass.

collecting ...
 test_core.py ✓✓✓✓✓✓✓✓✓✓                                                                                                                                                                                                                        83% ████████▍

―――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――― TestBasicRead.test_read[inputs0] ――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――――

self = <source_acceptance_test.tests.test_core.TestBasicRead object at 0x7f39cc406450>, connector_config = SecretDict(******)
configured_catalog = ConfiguredAirbyteCatalog(streams=[ConfiguredAirbyteStream(stream=AirbyteStream(name='ab-airbyte-testing', json_schema=...: 'full_refresh'>, cursor_field=None, destination_sync_mode=<DestinationSyncMode.append: 'append'>, primary_key=None)])
inputs = BasicReadTestConfig(config_path='secrets/config.json', configured_catalog_path='integration_tests/configured_catalog.json', empty_streams=set(), expect_records=None, validate_schema=True, timeout_seconds=None), expected_records = []
docker_runner = <source_acceptance_test.utils.connector_runner.ConnectorRunner object at 0x7f39cc406e50>, detailed_logger = <Logger detailed_logger /test_input/acceptance_tests_logs/test_core.py__TestBasicRead__test_read[inputs0].txt (DEBUG)>

    def test_read(
        self,
        connector_config,
        configured_catalog,
        inputs: BasicReadTestConfig,
        expected_records: List[AirbyteMessage],
        docker_runner: ConnectorRunner,
        detailed_logger,
    ):
        output = docker_runner.call_read(connector_config, configured_catalog)
        records = [message.record for message in filter_output(output, Type.RECORD)]

>       assert records, "At least one record should be read using provided catalog"
E       AssertionError: At least one record should be read using provided catalog
E       assert []

source_acceptance_test/tests/test_core.py:259: AssertionError

 test_core.py ⨯                                                                                                                                                                                                                                 92% █████████▎



 test_full_refresh.py ✓                                                                                                                                                                                                                        100% ██████████
================================================================================================================== short test summary info ===================================================================================================================
FAILED ../../test_input/test_core.py::TestBasicRead::test_read[inputs0] - AssertionError: At least one record should be read using provided catalog

Results (71.40s):
      11 passed
       1 failed
         - ../airbyte/source_acceptance_test/source_acceptance_test/tests/test_core.py:247 TestBasicRead.test_read[inputs0]

@sdairs
Copy link
Contributor Author

sdairs commented Oct 14, 2021

Have managed to get a successful run of the acceptance & units tests - output below

> Task :airbyte-integrations:connectors:source-amazon-sqs:unitTest
         unit_tests/unit_test.py::test_read {"type": "LOG", "log": {"level": "DEBUG", "message": "Amazon SQS Source Read - stream is: amazon-sqs-mock-queue"}}
         {"type": "LOG", "log": {"level": "DEBUG", "message": "Amazon SQS Source Read - Creating SQS connection ---"}}
         {"type": "LOG", "log": {"level": "DEBUG", "message": "Amazon SQS Source Read - Connected to SQS Queue ---"}}
         {"type": "LOG", "log": {"level": "DEBUG", "message": "Amazon SQS Source Read - Beginning message poll ---"}}
         {"type": "LOG", "log": {"level": "DEBUG", "message": "Amazon SQS Source Read - Message recieved: c9e561e9-9907-84d6-17ad-9557daf34efd"}}
         {"type": "LOG", "log": {"level": "DEBUG", "message": "Amazon SQS Source Read - Setting message visibility timeout: c9e561e9-9907-84d6-17ad-9557daf34efd"}}
         {"type": "LOG", "log": {"level": "DEBUG", "message": "Amazon SQS Source Read - Message visibility timeout set: c9e561e9-9907-84d6-17ad-9557daf34efd"}}
         {"type": "LOG", "log": {"level": "DEBUG", "message": "Amazon SQS Source Read - Beginning message poll ---"}}
         {"type": "LOG", "log": {"level": "DEBUG", "message": "Amazon SQS Source Read - No messages recieved during poll, time out reached ---"}}
         PASSED

         ----------- coverage: platform linux, python 3.9.7-final-0 -----------
         Name                            Stmts   Miss  Cover
         ---------------------------------------------------
         source_amazon_sqs/__init__.py       2      0   100%
         source_amazon_sqs/source.py        99     20    80%
         ---------------------------------------------------
         TOTAL                             101     20    80%
         Coverage HTML written to dir htmlcov


         ============================== 3 passed in 6.24s ===============================
Successfully tagged airbyte/source-amazon-sqs:dev

> Task :airbyte-integrations:connectors:source-amazon-sqs:sourceAcceptanceTest
============================= test session starts ==============================
platform linux -- Python 3.7.11, pytest-6.2.5, py-1.10.0, pluggy-1.0.0
rootdir: /test_input
plugins: sugar-0.9.4, timeout-1.4.2
collected 12 items / 1 skipped / 11 selected

test_core.py ...........                                                 [ 91%]
test_full_refresh.py .                                                   [100%]

=================== 12 passed, 1 skipped in 71.25s (0:01:11) ===================

Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.7.1/userguide/command_line_interface.html#sec:command_line_warnings

BUILD SUCCESSFUL in 2m 11s
80 actionable tasks: 46 executed, 34 up-to-date```

@marcosmarxm
Copy link
Member

awesome @sdairs I'm going to review and test your connector later today.

Comment on lines +15 to +18
### Deletes
Optionally, it can delete messages after reading - the delete_message() call is made __after__ yielding the message to the generator.
This means that messages aren't deleted unless read by a Destination - however, there is still potential that this could result in
missed messages if the Destination fails __after__ taking the message, but before commiting to to its own downstream.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sherifnada can you review this feature from Amazon SQS connector. Make sense enable this to an Airbyte Connector?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant to do this exactly for the reason mentioned. I would suggest we skip deleting messages. But my question then is: how do we do incremental sync? Is it possible to only read new messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would not be possible to do incremental sync without delete as there is no concept of offsets in SQS - I think it's an important feature to have, but we should make the caveats clear with warnings & further details in documentation.

Copy link
Member

@marcosmarxm marcosmarxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit and Integration Tests are running with CI Sandbox account. I'll wait @sherifnada feedback about the delete feature.

Can you move the boostrap.md to the connector doc page? Makes more sense when people are trying to setting up the connector already read this info.

@sdairs
Copy link
Contributor Author

sdairs commented Oct 20, 2021

Awesome thanks for reviewing.

The delete message is a standard SQS feature that I think should be supported - it is false by default to avoid accidental deletes, but I would expect most use cases for SQS would want deletes enabled.

@sdairs
Copy link
Contributor Author

sdairs commented Oct 20, 2021

There is of course potential that messages could be lost with Delete enabled - in that, if the Destination consumes the message, and then fails before commiting it to whatever is downstream, the Source will have deleted it after yielding it - in my limited understand of Airbyte so far, there is no persistence of messages that are 'in flight' i.e. being processed by a Destination or Source - so there would be no way to cover this message and have the Destination resume from where it left off.

A more robust method would be some sort of callback handle that a Destination could call after committing the message downstream, to notify the Source that the message is persisted and can be deleted. But I don't know if this is something that is possible - and would have performance considerations.

Comment on lines +15 to +18
### Deletes
Optionally, it can delete messages after reading - the delete_message() call is made __after__ yielding the message to the generator.
This means that messages aren't deleted unless read by a Destination - however, there is still potential that this could result in
missed messages if the Destination fails __after__ taking the message, but before commiting to to its own downstream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm hesitant to do this exactly for the reason mentioned. I would suggest we skip deleting messages. But my question then is: how do we do incremental sync? Is it possible to only read new messages?

@sdairs
Copy link
Contributor Author

sdairs commented Oct 25, 2021

Have added explicit warnings to Spec and further details to Readme to cover the delete option & potential data loss scenarios

Copy link
Member

@marcosmarxm marcosmarxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thanks for your contribution @sdairs !

@marcosmarxm marcosmarxm merged commit 58b569d into airbytehq:master Oct 26, 2021
schlattk pushed a commit to schlattk/airbyte that referenced this pull request Jan 4, 2022
* Initial commit, working source with static Creds

* Typo in example queue url

* Adds auto delete of messages after read

* Adds visibility timeout

* remove insecure comments from AWS IAM Key spec

* explicitly set supported sync modes

* explicit sync mode should be lower case

* Adds unit tests for check, discover, read

* remove incremental acceptance test block

* remove incremental from conf catalog sample

* remove test requirement moto from main req

* align int catalog sample with sample_files

* fixing catalog configs

* acceptance testing config

* adds expected records txt

* automated formatting changes

* remove expected records block from acpt test

* Adds Docs page

* Ammends formatting on readme

* Adds doc link to summary

* Improve error handling & debug logging

* Adds bootstrap.md

* Add a todo suggestion for batch output

* Adds SQS to integrations readme list

* lower case properties

* removed unused line

* uses enum for aws region

* updates sample configs to use lowercase

* required props to lower case

* add missed property to lowercase

* gradle formatting

* Fixing issues from acceptance tests

* annotate secrets in spec.json with airbyte_secret

* Adds explicit warnings about data less when using Delete Message option
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation community
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants