diff --git a/docs/docs/examples/integrations/sqs.md b/docs/docs/examples/integrations/sqs.md new file mode 100644 index 00000000..c6aea07b --- /dev/null +++ b/docs/docs/examples/integrations/sqs.md @@ -0,0 +1,144 @@ +--- +title: Real-time data transformation pipeline with Amazon S3 bucket, SQS and CocoIndex +description: Build real-time data transformation pipeline with S3 and CocoIndex. +sidebar_class_name: hidden +slug: /examples/s3_sqs_pipeline +canonicalUrl: '/examples/s3_sqs_pipeline' +sidebar_custom_props: + image: /img/integrations/sqs/cover.png + tags: [vector-index, s3, sqs, realtime, etl] +image: /img/integrations/sqs/cover.png +tags: [vector-index, s3, sqs, realtime, etl] +--- +import { DocumentationButton } from '../../../src/components/GitHubButton'; + +![cover](/img/integrations/sqs/cover.png) + +[CocoIndex](https://github.com/cocoindex-io/cocoindex) natively supports Amazon S3 as a source and integrates with AWS SQS for real-time, incremental S3 data processing. + +## AWS SQS + +[Amazon SQS](https://aws.amazon.com/sqs/) (Simple Queue Service) is a message queuing service that provides a reliable, highly-scalable hosted queue for storing messages as they travel between applications or microservices. When S3 files change, SQS queues event messages containing details like the event type, bucket, object key, and timestamp. Messages stay in the queue until processed, so no events are lost. + +## Live update out of the box with SQS +CocoIndex provides two modes to run your pipeline, one time update and live update, both leverage the incremental processing. Particularly with AWS SQS, you could leverage the live update mode - +where CocoIndex continuously monitors and reacts to the events in SQS, updating the target data in real-time. This is ideal for use cases where data freshness is critical. + + + + +## How does it work? +Let's take a look at simple example of how to build a real-time data transformation pipeline with S3 and CocoIndex. It builds a vector database of text embeddings from markdown files in S3. + +### S3 bucket and SQS setup +Please follow the [documentation](https://cocoindex.io/docs/sources/amazons3) to setup S3 bucket and SQS queue. + + + + + +#### S3 bucket +- Creating an AWS account. +- Configuring IAM permissions. +- Configure policies. You'll need at least the `AmazonS3ReadOnlyAccess` policy, and if you want to enable change notifications, you'll also need the `AmazonSQSFullAccess` policy. + ![Permission Config](/img/integrations/sqs/permission.png) + +#### SQS queue +For real-time change detection, you'll need to create an SQS queue and configure it to receive notifications from your S3 bucket. +Please follow the [documentation](https://cocoindex.io/docs/sources/amazons3#optional-setup-sqs-queue-for-event-notifications) to configure the S3 bucket to send event notifications to the SQS queue. +![SQS Queue](/img/integrations/sqs/sqs.png) + +Particularly, the SQS queue needs a specific access policy that allows S3 to send messages to it. + +```json +{ + ... + "Statement": [ + ... + { + "Sid": "__publish_statement", + "Effect": "Allow", + "Principal": { + "Service": "s3.amazonaws.com" + }, + "Resource": "${SQS_QUEUE_ARN}", + "Action": "SQS:SendMessage", + "Condition": { + "ArnLike": { + "aws:SourceArn": "${S3_BUCKET_ARN}" + } + } + } + ] +} +``` + +Then you can upload your files to the S3 bucket. +![S3 Bucket](/img/integrations/sqs/s3.png) + + +## Define Indexing Flow + +### Flow Design +![CocoIndex Flow for Text Embedding](/img/integrations/sqs/flow.png) + +The flow diagram illustrates how we'll process our codebase: +1. Read text files from the Amazon S3 bucket +2. Chunk each document +3. For each chunk, embed it with a text embedding model +4. Store the embeddings in a vector database for retrieval + + +### AWS File Ingestion + +Define the AWS endpoint and the SQS queue name in `.env` file: + +```bash +# Database Configuration +DATABASE_URL=postgresql://localhost:5432/cocoindex + +# Amazon S3 Configuration +AMAZON_S3_BUCKET_NAME=your-bucket-name +AMAZON_S3-SQS_QUEUE_URL=https://sqs.us-west-2.amazonaws.com/123456789/S3ChangeNotifications +``` + +Define indexing flow and ingest from Amazon S3 SQS queue: + +```python +@cocoindex.flow_def(name="AmazonS3TextEmbedding") +def amazon_s3_text_embedding_flow( + flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope +): + bucket_name = os.environ["AMAZON_S3_BUCKET_NAME"] + prefix = os.environ.get("AMAZON_S3_PREFIX", None) + sqs_queue_url = os.environ.get("AMAZON_S3_SQS_QUEUE_URL", None) + + data_scope["documents"] = flow_builder.add_source( + cocoindex.sources.AmazonS3( + bucket_name=bucket_name, + prefix=prefix, + included_patterns=["*.md", "*.mdx", "*.txt", "*.docx"], + binary=False, + sqs_queue_url=sqs_queue_url, + ) + ) + +``` + +This defines a flow that reads text files from the Amazon S3 bucket. + +![AWS File Ingestion](/img/integrations/sqs/ingest.png) + +### Rest of the flow +For the rest of the flow, we can follow the tutorial +[Simple Vector Index](https://cocoindex.io/docs/examples/simple_vector_index). +The entire project is available [here](https://github.com/cocoindex-io/cocoindex/tree/main/examples/amazon_s3_embedding). + +## Run the flow with live update +```bash +cocoindex update main.py -L +``` + +`-L` option means live update, see the [documentation](https://cocoindex.io/docs/core/flow_methods#live-update) for more details. +And you will have a continuous long running process that will update the vector database with any updates in the S3 bucket. + diff --git a/docs/sidebars.ts b/docs/sidebars.ts index 54cbbca0..6b896f32 100644 --- a/docs/sidebars.ts +++ b/docs/sidebars.ts @@ -116,6 +116,17 @@ const sidebars: SidebarsConfig = { }, ], }, + { + type: 'category', + label: 'Integrations', + collapsed: false, + items: [ + { + type: 'autogenerated', + dirName: 'examples/integrations', + }, + ], + }, ], tutorials: [ { diff --git a/docs/static/img/integrations/sqs/aws-incremental-etl.gif b/docs/static/img/integrations/sqs/aws-incremental-etl.gif new file mode 100644 index 00000000..44c286f8 Binary files /dev/null and b/docs/static/img/integrations/sqs/aws-incremental-etl.gif differ diff --git a/docs/static/img/integrations/sqs/cover.png b/docs/static/img/integrations/sqs/cover.png new file mode 100644 index 00000000..cb9b7b29 Binary files /dev/null and b/docs/static/img/integrations/sqs/cover.png differ diff --git a/docs/static/img/integrations/sqs/flow.png b/docs/static/img/integrations/sqs/flow.png new file mode 100644 index 00000000..77e91e49 Binary files /dev/null and b/docs/static/img/integrations/sqs/flow.png differ diff --git a/docs/static/img/integrations/sqs/ingest.png b/docs/static/img/integrations/sqs/ingest.png new file mode 100644 index 00000000..e71b58d3 Binary files /dev/null and b/docs/static/img/integrations/sqs/ingest.png differ diff --git a/docs/static/img/integrations/sqs/permission.png b/docs/static/img/integrations/sqs/permission.png new file mode 100644 index 00000000..7e15c8cb Binary files /dev/null and b/docs/static/img/integrations/sqs/permission.png differ diff --git a/docs/static/img/integrations/sqs/s3.png b/docs/static/img/integrations/sqs/s3.png new file mode 100644 index 00000000..e8e4bf06 Binary files /dev/null and b/docs/static/img/integrations/sqs/s3.png differ diff --git a/docs/static/img/integrations/sqs/sqs-landing.png b/docs/static/img/integrations/sqs/sqs-landing.png new file mode 100644 index 00000000..55339401 Binary files /dev/null and b/docs/static/img/integrations/sqs/sqs-landing.png differ diff --git a/docs/static/img/integrations/sqs/sqs.png b/docs/static/img/integrations/sqs/sqs.png new file mode 100644 index 00000000..cad79e6f Binary files /dev/null and b/docs/static/img/integrations/sqs/sqs.png differ