Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Un-Archive python destinstions #35838

Merged
merged 11 commits into from Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,5 @@
*
!Dockerfile
!main.py
!destination_amazon_sqs
!setup.py
38 changes: 38 additions & 0 deletions airbyte-integrations/connectors/destination-amazon-sqs/Dockerfile
@@ -0,0 +1,38 @@
FROM python:3.9.11-alpine3.15 as base

# build and load all requirements
FROM base as builder
WORKDIR /airbyte/integration_code

# upgrade pip to the latest version
RUN apk --no-cache upgrade \
&& pip install --upgrade pip \
&& apk --no-cache add tzdata build-base


COPY setup.py ./
# install necessary packages to a temporary folder
RUN pip install --prefix=/install .

# build a clean environment
FROM base
WORKDIR /airbyte/integration_code

# copy all loaded and built libraries to a pure basic image
COPY --from=builder /install /usr/local
# add default timezone settings
COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
RUN echo "Etc/UTC" > /etc/timezone

# bash is installed for more convenient debugging.
RUN apk --no-cache add bash

# copy payload code only
COPY main.py ./
COPY destination_amazon_sqs ./destination_amazon_sqs

ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/destination-amazon-sqs
99 changes: 99 additions & 0 deletions airbyte-integrations/connectors/destination-amazon-sqs/README.md
@@ -0,0 +1,99 @@
# Amazon Sqs Destination

This is the repository for the Amazon Sqs destination connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/destinations/amazon-sqs).

## Local development

### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**

#### Minimum Python version required `= 3.7.0`

#### Build & Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
python -m venv .venv
```

This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
development environment of choice. To activate it from the terminal, run:
```
source .venv/bin/activate
pip install -r requirements.txt
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.

Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
should work as you expect.

#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/destinations/amazon-sqs)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `destination_amazon_sqs/spec.json` file.
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.

**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `destination amazon-sqs test creds`
and place them into `secrets/config.json`.

### Locally running the connector
```
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```

### Locally running the connector docker image


#### Build
**Via [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md) (recommended):**
```bash
airbyte-ci connectors --name=destination-amazon-sqs build
```

An image will be built with the tag `airbyte/destination-amazon-sqs:dev`.

**Via `docker build`:**
```bash
docker build -t airbyte/destination-amazon-sqs:dev .
```

#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-amazon-sqs:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-amazon-sqs:dev check --config /secrets/config.json
# messages.jsonl is a file containing line-separated JSON representing AirbyteMessages
cat messages.jsonl | docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-amazon-sqs:dev write --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```

## Testing
You can run our full test suite locally using [`airbyte-ci`](https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/README.md):
```bash
airbyte-ci connectors --name=destination-amazon-sqs test
```

### Customizing acceptance Tests
Customize `acceptance-test-config.yml` file to configure tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.

## Dependency Management
All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development.
We split dependencies between two groups, dependencies that are:
* required for your connector to work need to go to `MAIN_REQUIREMENTS` list.
* required for the testing need to go to `TEST_REQUIREMENTS` list

### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=destination-amazon-sqs test`
2. Bump the connector version in `metadata.yaml`: increment the `dockerImageTag` value. Please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors).
3. Make sure the `metadata.yaml` content is up to date.
4. Make the connector documentation and its changelog is up to date (`docs/integrations/destinations/amazon-sqs.md`).
5. Create a Pull Request: use [our PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#pull-request-title-convention).
6. Pat yourself on the back for being an awesome contributor.
7. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.

@@ -0,0 +1,59 @@
# Amazon SQS Destination

## What
This is a connector for producing messages to an [Amazon SQS Queue](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html)

## How
### Sending messages
Amazon SQS allows messages to be sent individually or in batches. Currently, this Destination only supports sending messages individually. This can
have performance implications if sending high volumes of messages.

#### Message Body
By default, the SQS Message body is built using the AirbyteMessageRecord's 'data' property.

If the **message_body_key** config item is set, we use the value as a key within the AirbyteMessageRecord's 'data' property. This could be
improved to handle nested keys by using JSONPath syntax to lookup values.

For example, given the input Record:
```
{
"data":
{
"parent_key": {
"nested_key": "nested_value"
},
"top_key": "top_value"
}
}
```

With no **message_body_key** set, the output SQS Message body will be
```
{
"parent_key": {
"nested_key": "nested_value"
},
"top_key": "top_value"
}
```

With **message_body_key** set to `parent_key`, the output SQS Message body will be
```
{
"nested_key": "nested_value"
}
```

#### Message attributes
The airbyte_emmited_at timestamp is added to every message as an Attribute by default. This could be improved to allow the user to set Attributes through the UI, or to take keys from the Record as Attributes.

#### FIFO Queues
A Queue URL that ends with '.fifo' **must** be a valid FIFO Queue. When the queue is FIFO, the *message_group_id* property is required.

Currently, a unique uuid4 is generated as the dedupe ID for every message. This could be improved to allow the user to specify a path in the Record
to use as a dedupe ID.

### Credentials
Requires an AWS IAM Access Key ID and Secret Key.

This could be improved to add support for configured AWS profiles, env vars etc.
@@ -0,0 +1,8 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


from .destination import DestinationAmazonSqs

__all__ = ["DestinationAmazonSqs"]
@@ -0,0 +1,176 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import json
from typing import Any, Iterable, Mapping
from uuid import uuid4

import boto3
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.destinations import Destination
from airbyte_cdk.models import AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type
from botocore.exceptions import ClientError


class DestinationAmazonSqs(Destination):
def queue_is_fifo(self, url: str) -> bool:
return url.endswith(".fifo")

def parse_queue_name(self, url: str) -> str:
return url.rsplit("/", 1)[-1]

def send_single_message(self, queue, message) -> dict:
return queue.send_message(**message)

def build_sqs_message(self, record, message_body_key=None):
data = None
if message_body_key:
data = record.data.get(message_body_key)
if data is None:
raise Exception("Message had no attribute of the configured Message Body Key: " + message_body_key)
else:
data = json.dumps(record.data)

message = {"MessageBody": data}

return message

def add_attributes_to_message(self, record, message):
attributes = {"airbyte_emitted_at": {"StringValue": str(record.emitted_at), "DataType": "String"}}
message["MessageAttributes"] = attributes
return message

def set_message_delay(self, message, message_delay):
message["DelaySeconds"] = message_delay
return message

# MessageGroupID and MessageDeduplicationID are required properties for FIFO queues
# https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
def set_message_fifo_properties(self, message, message_group_id, use_content_dedupe=False):
# https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html
if not message_group_id:
raise Exception("Failed to build message - Message Group ID is required for FIFO queues")
else:
message["MessageGroupId"] = message_group_id
# https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html
if not use_content_dedupe:
message["MessageDeduplicationId"] = str(uuid4())
# TODO: Support getting MessageDeduplicationId from a key in the record
# if message_dedupe_id:
# message['MessageDeduplicationId'] = message_dedupe_id
return message

# TODO: Support batch send
# def send_batch_messages(messages, queue):
# entry = {
# 'Id': "1",
# 'MessageBody': str(record.data),
# }
# response = queue.send_messages(Entries=messages)
# if 'Successful' in response:
# for status in response['Successful']:
# print("Message sent: " + status['MessageId'])
# if 'Failed' in response:
# for status in response['Failed']:
# print("Message sent: " + status['MessageId'])

# https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
def write(
self, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, input_messages: Iterable[AirbyteMessage]
) -> Iterable[AirbyteMessage]:

# Required propeties
queue_url = config["queue_url"]
queue_region = config["region"]

# TODO: Implement optional params for batch
# Optional Properties
# max_batch_size = config.get("max_batch_size", 10)
# send_as_batch = config.get("send_as_batch", False)
message_delay = config.get("message_delay")
message_body_key = config.get("message_body_key")

# FIFO Properties
message_group_id = config.get("message_group_id")

# Senstive Properties
access_key = config["access_key"]
secret_key = config["secret_key"]

session = boto3.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=queue_region)
sqs = session.resource("sqs")
queue = sqs.Queue(url=queue_url)

# TODO: Make access/secret key optional, support public access & profiles
# TODO: Support adding/setting attributes in the UI
# TODO: Support extract a specific path as message attributes

for message in input_messages:
if message.type == Type.RECORD:
sqs_message = self.build_sqs_message(message.record, message_body_key)

if message_delay:
sqs_message = self.set_message_delay(sqs_message, message_delay)

sqs_message = self.add_attributes_to_message(message.record, sqs_message)

if self.queue_is_fifo(queue_url):
use_content_dedupe = False if queue.attributes.get("ContentBasedDeduplication") == "false" else "true"
self.set_message_fifo_properties(sqs_message, message_group_id, use_content_dedupe)

self.send_single_message(queue, sqs_message)
if message.type == Type.STATE:
yield message

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
try:
# Required propeties
queue_url = config["queue_url"]
logger.debug("Amazon SQS Destination Config Check - queue_url: " + queue_url)
queue_region = config["region"]
logger.debug("Amazon SQS Destination Config Check - region: " + queue_region)

# Senstive Properties
access_key = config["access_key"]
logger.debug("Amazon SQS Destination Config Check - access_key (ends with): " + access_key[-1])
secret_key = config["secret_key"]
logger.debug("Amazon SQS Destination Config Check - secret_key (ends with): " + secret_key[-1])

logger.debug("Amazon SQS Destination Config Check - Starting connection test ---")
session = boto3.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=queue_region)
sqs = session.resource("sqs")
queue = sqs.Queue(url=queue_url)
if hasattr(queue, "attributes"):
logger.debug("Amazon SQS Destination Config Check - Connection test successful ---")

if self.queue_is_fifo(queue_url):
fifo = queue.attributes.get("FifoQueue", False)
if not fifo:
raise Exception("FIFO Queue URL set but Queue is not FIFO")

message_group_id = config.get("message_group_id")
if message_group_id is None:
raise Exception("Message Group ID is not set, but is required for FIFO Queues.")

# TODO: Support referencing an ID inside the Record to use as de-dupe ID
# message_dedupe_key = config.get("message_dedupe_key")
# content_dedupe = queue.attributes.get('ContentBasedDeduplication')
# if content_dedupe == "false":
# if message_dedupe_id is None:
# raise Exception("You must provide a Message Deduplication ID when ContentBasedDeduplication is not used.")

return AirbyteConnectionStatus(status=Status.SUCCEEDED)
else:
return AirbyteConnectionStatus(
status=Status.FAILED, message="Amazon SQS Destination Config Check - Could not connect to queue"
)
except ClientError as e:
return AirbyteConnectionStatus(
status=Status.FAILED, message=f"Amazon SQS Destination Config Check - Error in AWS Client: {str(e)}"
)
except Exception as e:
return AirbyteConnectionStatus(
status=Status.FAILED, message=f"Amazon SQS Destination Config Check - An exception occurred: {str(e)}"
)