From 742c9e397250edd8d5a5c9ddc8ce7df29896bf7b Mon Sep 17 00:00:00 2001 From: Ben Eggers <64657842+beggers@users.noreply.github.com> Date: Tue, 16 Jan 2024 15:22:11 -0800 Subject: [PATCH 1/6] [BUG] Fix hosted chroma release trigger --- .github/workflows/chroma-release.yml | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/.github/workflows/chroma-release.yml b/.github/workflows/chroma-release.yml index 44a31160903..6c2250a0fb3 100644 --- a/.github/workflows/chroma-release.yml +++ b/.github/workflows/chroma-release.yml @@ -141,7 +141,7 @@ jobs: artifacts: "dist/chroma-${{steps.version.outputs.version}}.tar.gz" allowUpdates: true prerelease: true - - name: Trigger Hosted Chroma Release + - name: Trigger Hosted Chroma FE Release uses: actions/github-script@v6 with: github-token: ${{ secrets.HOSTED_CHROMA_WORKFLOW_DISPATCH_TOKEN }} @@ -149,7 +149,31 @@ jobs: const result = await github.rest.actions.createWorkflowDispatch({ owner: 'chroma-core', repo: 'hosted-chroma', - workflow_id: 'build-and-publish-image.yaml', + workflow_id: 'build-and-publish-frontend.yaml', + ref: 'main' + }) + console.log(result) + - name: Trigger Hosted Chroma Coordinator Release + uses: actions/github-script@v6 + with: + github-token: ${{ secrets.HOSTED_CHROMA_WORKFLOW_DISPATCH_TOKEN }} + script: | + const result = await github.rest.actions.createWorkflowDispatch({ + owner: 'chroma-core', + repo: 'hosted-chroma', + workflow_id: 'build-and-deploy-coordinator.yaml', + ref: 'main' + }) + console.log(result) + - name: Trigger Hosted Worker Release + uses: actions/github-script@v6 + with: + github-token: ${{ secrets.HOSTED_CHROMA_WORKFLOW_DISPATCH_TOKEN }} + script: | + const result = await github.rest.actions.createWorkflowDispatch({ + owner: 'chroma-core', + repo: 'hosted-chroma', + workflow_id: 'build-and-deploy-worker.yaml', ref: 'main' }) console.log(result) From d49ec7796914678dd69177a226f50e6256e1a773 Mon Sep 17 00:00:00 2001 From: nicolasgere Date: Tue, 16 Jan 2024 16:14:55 -0800 Subject: [PATCH 2/6] [BUG] update openai api in example (#1641) ## Description of changes - Update openai call to use the new api (#1640 ) --- examples/chat_with_your_documents/main.py | 14 +++++++------- examples/chat_with_your_documents/requirements.txt | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/chat_with_your_documents/main.py b/examples/chat_with_your_documents/main.py index 58b85499d5d..dcc631beb78 100644 --- a/examples/chat_with_your_documents/main.py +++ b/examples/chat_with_your_documents/main.py @@ -1,12 +1,12 @@ import argparse import os from typing import List, Dict - +from openai.types.chat import ChatCompletionMessageParam import openai import chromadb -def build_prompt(query: str, context: List[str]) -> List[Dict[str, str]]: +def build_prompt(query: str, context: List[str]) -> List[ChatCompletionMessageParam]: """ Builds a prompt for the LLM. # @@ -21,10 +21,10 @@ def build_prompt(query: str, context: List[str]) -> List[Dict[str, str]]: context (List[str]): The context of the query, returned by embedding search. Returns: - A prompt for the LLM (List[Dict[str, str]]). + A prompt for the LLM (List[ChatCompletionMessageParam]). """ - system = { + system: ChatCompletionMessageParam = { "role": "system", "content": "I am going to ask you a question, which I would like you to answer" "based only on the provided context, and not any other information." @@ -32,11 +32,11 @@ def build_prompt(query: str, context: List[str]) -> List[Dict[str, str]]: 'say "I am not sure", then try to make a guess.' "Break your answer up into nicely readable paragraphs.", } - user = { + user: ChatCompletionMessageParam = { "role": "user", "content": f"The question is {query}. Here is all the context you have:" f'{(" ").join(context)}', - } + } return [system, user] @@ -52,7 +52,7 @@ def get_chatGPT_response(query: str, context: List[str], model_name: str) -> str Returns: A response to the question. """ - response = openai.ChatCompletion.create( + response = openai.chat.completions.create( model=model_name, messages=build_prompt(query, context), ) diff --git a/examples/chat_with_your_documents/requirements.txt b/examples/chat_with_your_documents/requirements.txt index a7b995025e1..61a378d9ea4 100644 --- a/examples/chat_with_your_documents/requirements.txt +++ b/examples/chat_with_your_documents/requirements.txt @@ -1,3 +1,3 @@ chromadb>=0.4.4 -openai +openai>=1.7.2 tqdm From 78ad9174fcb0c7dc95d3746943f32207c838aaef Mon Sep 17 00:00:00 2001 From: Hammad Bashir Date: Tue, 16 Jan 2024 16:32:59 -0800 Subject: [PATCH 3/6] [BUG] Fix coordinator pulsar tenant and namespace (#1650) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - This aligns the coordinator, rust worker, and python client on default/default as the pulsar tenant/namespace --- chromadb/test/db/test_system.py | 9 ++++++--- go/coordinator/cmd/grpccoordinator/cmd.go | 2 +- k8s/deployment/kubernetes.yaml | 3 --- k8s/deployment/segment-server.yaml | 4 ++++ 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/chromadb/test/db/test_system.py b/chromadb/test/db/test_system.py index 80c78b05745..944c0f81069 100644 --- a/chromadb/test/db/test_system.py +++ b/chromadb/test/db/test_system.py @@ -20,13 +20,16 @@ from pytest import FixtureRequest import uuid +PULSAR_TENANT = "default" +PULSAR_NAMESPACE = "default" + # These are the sample collections that are used in the tests below. Tests can override # the fields as needed. sample_collections = [ Collection( id=uuid.UUID(int=1), name="test_collection_1", - topic="persistent://test-tenant/test-topic/00000000-0000-0000-0000-000000000001", + topic=f"persistent://{PULSAR_TENANT}/{PULSAR_NAMESPACE}/00000000-0000-0000-0000-000000000001", metadata={"test_str": "str1", "test_int": 1, "test_float": 1.3}, dimension=128, database=DEFAULT_DATABASE, @@ -35,7 +38,7 @@ Collection( id=uuid.UUID(int=2), name="test_collection_2", - topic="persistent://test-tenant/test-topic/00000000-0000-0000-0000-000000000002", + topic=f"persistent://{PULSAR_TENANT}/{PULSAR_NAMESPACE}/00000000-0000-0000-0000-000000000002", metadata={"test_str": "str2", "test_int": 2, "test_float": 2.3}, dimension=None, database=DEFAULT_DATABASE, @@ -44,7 +47,7 @@ Collection( id=uuid.UUID(int=3), name="test_collection_3", - topic="persistent://test-tenant/test-topic/00000000-0000-0000-0000-000000000003", + topic=f"persistent://{PULSAR_TENANT}/{PULSAR_NAMESPACE}/00000000-0000-0000-0000-000000000003", metadata={"test_str": "str3", "test_int": 3, "test_float": 3.3}, dimension=None, database=DEFAULT_DATABASE, diff --git a/go/coordinator/cmd/grpccoordinator/cmd.go b/go/coordinator/cmd/grpccoordinator/cmd.go index d14520bf8d0..8859790b56c 100644 --- a/go/coordinator/cmd/grpccoordinator/cmd.go +++ b/go/coordinator/cmd/grpccoordinator/cmd.go @@ -42,7 +42,7 @@ func init() { // Pulsar Cmd.Flags().StringVar(&conf.PulsarAdminURL, "pulsar-admin-url", "http://localhost:8080", "Pulsar admin url") Cmd.Flags().StringVar(&conf.PulsarURL, "pulsar-url", "pulsar://localhost:6650", "Pulsar url") - Cmd.Flags().StringVar(&conf.PulsarTenant, "pulsar-tenant", "public", "Pulsar tenant") + Cmd.Flags().StringVar(&conf.PulsarTenant, "pulsar-tenant", "default", "Pulsar tenant") Cmd.Flags().StringVar(&conf.PulsarNamespace, "pulsar-namespace", "default", "Pulsar namespace") // Notification diff --git a/k8s/deployment/kubernetes.yaml b/k8s/deployment/kubernetes.yaml index 1df12e9130e..b1f9baabdd0 100644 --- a/k8s/deployment/kubernetes.yaml +++ b/k8s/deployment/kubernetes.yaml @@ -193,9 +193,6 @@ spec: - "--pulsar-admin-url=http://pulsar.chroma:8080" - "--pulsar-url=pulsar://pulsar.chroma:6650" - "--notifier-provider=pulsar" - - "--pulsar-tenant=test-tenant" - - "--pulsar-namespace=test-topic" - - "--assignment-policy=simple" image: chroma-coordinator imagePullPolicy: IfNotPresent name: coordinator diff --git a/k8s/deployment/segment-server.yaml b/k8s/deployment/segment-server.yaml index 0f2c6e02858..33af91d1314 100644 --- a/k8s/deployment/segment-server.yaml +++ b/k8s/deployment/segment-server.yaml @@ -43,6 +43,10 @@ spec: env: - name: CHROMA_WORKER__PULSAR_URL value: pulsar://pulsar.chroma:6650 + - name: CHROMA_WORKER__PULSAR_NAMESPACE + value: default + - name: CHROMA_WORKER__PULSAR_TENANT + value: default - name: CHROMA_WORKER__MY_IP valueFrom: fieldRef: From 8e5240997fc150bdd4136fa83b4c576be500ba48 Mon Sep 17 00:00:00 2001 From: nicolasgere Date: Tue, 16 Jan 2024 17:15:50 -0800 Subject: [PATCH 4/6] [BUG] fix docker don't shutdown gracefully (#1648) ## Description of changes - Docker gracefully shutdown (#1646 ) --- bin/docker_entrypoint.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/docker_entrypoint.sh b/bin/docker_entrypoint.sh index b1336b8d455..eab12e3795b 100755 --- a/bin/docker_entrypoint.sh +++ b/bin/docker_entrypoint.sh @@ -4,4 +4,4 @@ echo "Rebuilding hnsw to ensure architecture compatibility" pip install --force-reinstall --no-cache-dir chroma-hnswlib export IS_PERSISTENT=1 export CHROMA_SERVER_NOFILE=65535 -uvicorn chromadb.app:app --workers 1 --host 0.0.0.0 --port 8000 --proxy-headers --log-config chromadb/log_config.yml --timeout-keep-alive 30 +exec uvicorn chromadb.app:app --workers 1 --host 0.0.0.0 --port 8000 --proxy-headers --log-config chromadb/log_config.yml --timeout-keep-alive 30 From d5b4a642a6eef05aa6eaf4723fbf703d78929251 Mon Sep 17 00:00:00 2001 From: Hammad Bashir Date: Tue, 16 Jan 2024 21:06:03 -0800 Subject: [PATCH 5/6] [TST] Move cluster test to bigger machine, Hardcode tests to use rendezvous hash of topics (#1651) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - Move cluster tests to a bigger machine. I suspect the default machines are too slow resulting in a time out. - Fix the coordinator sysdb tests to use rendezvous hashing since we cannot parametrize the service during integration tests easily. --- .github/workflows/chroma-cluster-test.yml | 2 +- bin/cluster-test.sh | 5 +++-- chromadb/test/db/test_system.py | 14 ++++++++++---- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/.github/workflows/chroma-cluster-test.yml b/.github/workflows/chroma-cluster-test.yml index 52535b8f719..e474f43ca7d 100644 --- a/.github/workflows/chroma-cluster-test.yml +++ b/.github/workflows/chroma-cluster-test.yml @@ -15,7 +15,7 @@ jobs: strategy: matrix: python: ['3.8'] - platform: [ubuntu-latest] + platform: ['16core-64gb-ubuntu-latest'] testfile: ["chromadb/test/ingest/test_producer_consumer.py", "chromadb/test/db/test_system.py", "chromadb/test/segment/distributed/test_memberlist_provider.py",] diff --git a/bin/cluster-test.sh b/bin/cluster-test.sh index b269670abb7..10c48781c07 100755 --- a/bin/cluster-test.sh +++ b/bin/cluster-test.sh @@ -26,6 +26,7 @@ minikube addons enable ingress-dns -p chroma-test eval $(minikube -p chroma-test docker-env) docker build -t server:latest -f Dockerfile . docker build -t chroma-coordinator:latest -f go/coordinator/Dockerfile . +docker build -t worker -f rust/worker/Dockerfile . --build-arg CHROMA_KUBERNETES_INTEGRATION=1 # Apply the kubernetes manifests kubectl apply -f k8s/deployment @@ -45,8 +46,8 @@ sleep 10 export CHROMA_CLUSTER_TEST_ONLY=1 export CHROMA_SERVER_HOST=$(kubectl get svc server -n chroma -o=jsonpath='{.status.loadBalancer.ingress[0].ip}') -export PULSAR_BROKER_URL=$(kubectl get svc pulsar -n chroma -o=jsonpath='{.status.loadBalancer.ingress[0].ip}') -export CHROMA_COORDINATOR_HOST=$(kubectl get svc coordinator -n chroma -o=jsonpath='{.status.loadBalancer.ingress[0].ip}') +export PULSAR_BROKER_URL=$(kubectl get svc pulsar-lb -n chroma -o=jsonpath='{.status.loadBalancer.ingress[0].ip}') +export CHROMA_COORDINATOR_HOST=$(kubectl get svc coordinator-lb -n chroma -o=jsonpath='{.status.loadBalancer.ingress[0].ip}') export CHROMA_SERVER_GRPC_PORT="50051" echo "Chroma Server is running at port $CHROMA_SERVER_HOST" diff --git a/chromadb/test/db/test_system.py b/chromadb/test/db/test_system.py index 944c0f81069..9971d81af93 100644 --- a/chromadb/test/db/test_system.py +++ b/chromadb/test/db/test_system.py @@ -25,11 +25,17 @@ # These are the sample collections that are used in the tests below. Tests can override # the fields as needed. + +# HACK: In order to get the real grpc tests passing, we need the topic to use rendezvous +# hashing. This is because the grpc tests use the real grpc sysdb server and the +# rendezvous hashing is done in the segment server. We don't have a easy way to parameterize +# the assignment policy in the grpc tests, so we just use rendezvous hashing for all tests. +# by harcoding the topic to what we expect rendezvous hashing to return with 16 topics. sample_collections = [ Collection( id=uuid.UUID(int=1), name="test_collection_1", - topic=f"persistent://{PULSAR_TENANT}/{PULSAR_NAMESPACE}/00000000-0000-0000-0000-000000000001", + topic=f"persistent://{PULSAR_TENANT}/{PULSAR_NAMESPACE}/chroma_log_1", metadata={"test_str": "str1", "test_int": 1, "test_float": 1.3}, dimension=128, database=DEFAULT_DATABASE, @@ -38,7 +44,7 @@ Collection( id=uuid.UUID(int=2), name="test_collection_2", - topic=f"persistent://{PULSAR_TENANT}/{PULSAR_NAMESPACE}/00000000-0000-0000-0000-000000000002", + topic=f"persistent://{PULSAR_TENANT}/{PULSAR_NAMESPACE}/chroma_log_14", metadata={"test_str": "str2", "test_int": 2, "test_float": 2.3}, dimension=None, database=DEFAULT_DATABASE, @@ -47,7 +53,7 @@ Collection( id=uuid.UUID(int=3), name="test_collection_3", - topic=f"persistent://{PULSAR_TENANT}/{PULSAR_NAMESPACE}/00000000-0000-0000-0000-000000000003", + topic=f"persistent://{PULSAR_TENANT}/{PULSAR_NAMESPACE}/chroma_log_14", metadata={"test_str": "str3", "test_int": 3, "test_float": 3.3}, dimension=None, database=DEFAULT_DATABASE, @@ -174,7 +180,7 @@ def test_create_get_delete_collections(sysdb: SysDB) -> None: # Find by topic for collection in sample_collections: result = sysdb.get_collections(topic=collection["topic"]) - assert result == [collection] + assert collection in result # Find by id for collection in sample_collections: From 7aaf36fb9a0c52b83b79c563b24c94cffd684cc6 Mon Sep 17 00:00:00 2001 From: Hammad Bashir Date: Wed, 17 Jan 2024 08:59:34 -0800 Subject: [PATCH 6/6] [ENH] Add s3 storage for rust worker (#1643) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - Update bin/cluster-test to build rust worker - Parameterize dockerfile to support building the docker image for and not for integration tests - Add build.rs step for integration tests - Make memberlist and storage tests only run in response to this flag - New functionality - Adds a basic storage system that gets/puts to s3 ## Test plan *How are these changes tested?* New tests were added for basic use of storage. - [x] Tests pass locally with `cargo test` --- Cargo.lock | 674 +++++++++++++++++- rust/worker/Cargo.toml | 3 + rust/worker/Dockerfile | 2 + rust/worker/build.rs | 13 + rust/worker/chroma_config.yaml | 3 + rust/worker/src/config.rs | 14 +- rust/worker/src/lib.rs | 1 + .../src/memberlist/memberlist_provider.rs | 21 +- rust/worker/src/storage/config.rs | 20 + rust/worker/src/storage/mod.rs | 9 + rust/worker/src/storage/s3.rs | 216 ++++++ 11 files changed, 933 insertions(+), 43 deletions(-) create mode 100644 rust/worker/src/storage/config.rs create mode 100644 rust/worker/src/storage/mod.rs create mode 100644 rust/worker/src/storage/s3.rs diff --git a/Cargo.lock b/Cargo.lock index 44c2a1d8330..932b41154ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -336,6 +336,386 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "aws-config" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e64b72d4bdbb41a73d27709c65a25b6e4bfc8321bf70fa3a8b19ce7d4eb81b0" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.0.1", + "hex", + "http", + "hyper", + "ring", + "time", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a7cb3510b95492bd9014b60e2e3bee3e48bc516e220316f8e6b60df18b47331" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + +[[package]] +name = "aws-http" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a95d41abe4e941399fdb4bc2f54713eac3c839d98151875948bb24e66ab658f2" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http", + "http-body", + "pin-project-lite", + "tracing", +] + +[[package]] +name = "aws-runtime" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233cca219c6705d525ace011d6f9bc51aaf32fce5b4c41661d2d7ff22d9b4d49" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "fastrand 2.0.1", + "http", + "percent-encoding", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-s3" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634fbe5b6591ee2e281cd2ba8641e9bd752dbf5bf338924d6ad4bd5a3304fe31" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-checksums", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "bytes", + "http", + "http-body", + "once_cell", + "percent-encoding", + "regex-lite", + "tracing", + "url", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee41005e0f3a19ae749c7953d9e1f1ef8d2183f76f64966e346fa41c1ba0ed44" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa08168f8a27505e7b90f922c32a489feb1f2133878981a15138bebc849ac09c" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29102eff04d50ef70f11a48823db33e33c6cc5f027bfb6ff4864efbd5f1f66f3" +dependencies = [ + "aws-credential-types", + "aws-http", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b92384b39aedb258aa734fe0e7b2ffcd13f33e68227251a72cd2635e0acc8f1a" +dependencies = [ + "aws-credential-types", + "aws-smithy-eventstream", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "crypto-bigint 0.5.5", + "form_urlencoded", + "hex", + "hmac", + "http", + "once_cell", + "p256 0.11.1", + "percent-encoding", + "ring", + "sha2", + "subtle", + "time", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-async" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71d8e1c0904f78c76846a9dad41c28b41d330d97741c3e70d003d9a747d95e2a" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-checksums" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62d59ef74bf94562512e570eeccb81e9b3879f9136b2171ed4bf996ffa609955" +dependencies = [ + "aws-smithy-http", + "aws-smithy-types", + "bytes", + "crc32c", + "crc32fast", + "hex", + "http", + "http-body", + "md-5", + "pin-project-lite", + "sha1", + "sha2", + "tracing", +] + +[[package]] +name = "aws-smithy-eventstream" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31cf0466890a20988b9b2864250dd907f769bd189af1a51ba67beec86f7669fb" +dependencies = [ + "aws-smithy-types", + "bytes", + "crc32fast", +] + +[[package]] +name = "aws-smithy-http" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "568a3b159001358dd96143378afd7470e19baffb6918e4b5016abe576e553f9c" +dependencies = [ + "aws-smithy-eventstream", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http", + "http-body", + "once_cell", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f12bfb23370a069f8facbfd53ce78213461b0a8570f6c81488030f5ab6f8cc4e" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b1adc06e0175c175d280267bb8fd028143518013fcb869e1c3199569a2e902a" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cf0f6845d2d97b953cea791b0ee37191c5509f2897ec7eb7580a0e7a594e98b" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand 2.0.1", + "h2", + "http", + "http-body", + "hyper", + "hyper-rustls", + "once_cell", + "pin-project-lite", + "pin-utils", + "rustls", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47798ba97a33979c80e837519cf837f18fd6df0adb02dd5286a75d9891c6e671" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e9a85eafeaf783b2408e35af599e8b96f2c49d9a5d13ad3a887fbdefb6bc744" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http", + "http-body", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a84bee2b44c22cbba59f12c34b831a97df698f8e43df579b35998652a00dc13" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8549aa62c5b7db5c57ab915200ee214b4f5d8f19b29a4a8fa0b3ad3bca1380e3" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "http", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.6.20" @@ -407,6 +787,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base16ct" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce" + [[package]] name = "base16ct" version = "0.2.0" @@ -425,6 +811,16 @@ version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.6.0" @@ -498,6 +894,16 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "cc" version = "1.0.83" @@ -584,6 +990,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32c" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8f48d60e5b4d2c53d5c2b1d8a58c849a70ae5e5509b08a48d047e3b65714a74" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.3.2" @@ -626,6 +1041,18 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crypto-bigint" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef2b4b23cddf68b89b8f8069890e8c270d54e2d5fe1b143820234805e4cb17ef" +dependencies = [ + "generic-array", + "rand_core", + "subtle", + "zeroize", +] + [[package]] name = "crypto-bigint" version = "0.5.5" @@ -717,6 +1144,16 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c297a1c74b71ae29df00c3e22dd9534821d60eb9af5a0192823fa2acea70c2a" +[[package]] +name = "der" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de" +dependencies = [ + "const-oid", + "zeroize", +] + [[package]] name = "der" version = "0.7.8" @@ -767,18 +1204,30 @@ version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "545b22097d44f8a9581187cdf93de7a71e4722bf51200cfaba810865b49a495d" +[[package]] +name = "ecdsa" +version = "0.14.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413301934810f597c1d19ca71c8710e99a3f1ba28a0d2ebc01551a2daeea3c5c" +dependencies = [ + "der 0.6.1", + "elliptic-curve 0.12.3", + "rfc6979 0.3.1", + "signature 1.6.4", +] + [[package]] name = "ecdsa" version = "0.16.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee27f32b5c5292967d2d4a9d7f1e0b0aed2c15daded5a60300e4abb9d8020bca" dependencies = [ - "der", + "der 0.7.8", "digest", - "elliptic-curve", - "rfc6979", - "signature", - "spki", + "elliptic-curve 0.13.8", + "rfc6979 0.4.0", + "signature 2.2.0", + "spki 0.7.3", ] [[package]] @@ -787,8 +1236,8 @@ version = "2.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" dependencies = [ - "pkcs8", - "signature", + "pkcs8 0.10.2", + "signature 2.2.0", ] [[package]] @@ -811,23 +1260,43 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +[[package]] +name = "elliptic-curve" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7bb888ab5300a19b8e5bceef25ac745ad065f3c9f7efc6de1b91958110891d3" +dependencies = [ + "base16ct 0.1.1", + "crypto-bigint 0.4.9", + "der 0.6.1", + "digest", + "ff 0.12.1", + "generic-array", + "group 0.12.1", + "pkcs8 0.9.0", + "rand_core", + "sec1 0.3.0", + "subtle", + "zeroize", +] + [[package]] name = "elliptic-curve" version = "0.13.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6043086bf7973472e0c7dff2142ea0b680d30e18d9cc40f267efbf222bd47" dependencies = [ - "base16ct", - "crypto-bigint", + "base16ct 0.2.0", + "crypto-bigint 0.5.5", "digest", - "ff", + "ff 0.13.0", "generic-array", - "group", + "group 0.13.0", "hkdf", "pem-rfc7468", - "pkcs8", + "pkcs8 0.10.2", "rand_core", - "sec1", + "sec1 0.7.3", "subtle", "zeroize", ] @@ -910,6 +1379,16 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +[[package]] +name = "ff" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d013fc25338cc558c5c2cfbad646908fb23591e2404481826742b651c9af7160" +dependencies = [ + "rand_core", + "subtle", +] + [[package]] name = "ff" version = "0.13.0" @@ -1153,13 +1632,24 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "group" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dfbfb3a6cfbd390d5c9564ab283a0349b9b9fcd46a706c1eb10e0db70bfbac7" +dependencies = [ + "ff 0.12.1", + "rand_core", + "subtle", +] + [[package]] name = "group" version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0f9ef7462f7c099f518d754361858f86d8a07af53ba9af0fe635bbccb151a63" dependencies = [ - "ff", + "ff 0.13.0", "rand_core", "subtle", ] @@ -1701,6 +2191,16 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.6.4" @@ -1907,7 +2407,7 @@ dependencies = [ "itertools 0.10.5", "log", "oauth2", - "p256", + "p256 0.13.2", "p384", "rand", "rsa", @@ -1977,14 +2477,31 @@ dependencies = [ "num-traits", ] +[[package]] +name = "outref" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" + +[[package]] +name = "p256" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594" +dependencies = [ + "ecdsa 0.14.8", + "elliptic-curve 0.12.3", + "sha2", +] + [[package]] name = "p256" version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9863ad85fa8f4460f9c48cb909d38a0d689dba1f6f6988a5e3e0d31071bcd4b" dependencies = [ - "ecdsa", - "elliptic-curve", + "ecdsa 0.16.9", + "elliptic-curve 0.13.8", "primeorder", "sha2", ] @@ -1995,8 +2512,8 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70786f51bcc69f6a4c0360e063a4cac5419ef7c5cd5b3c99ad70f3be5ba79209" dependencies = [ - "ecdsa", - "elliptic-curve", + "ecdsa 0.16.9", + "elliptic-curve 0.13.8", "primeorder", "sha2", ] @@ -2137,9 +2654,19 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" dependencies = [ - "der", - "pkcs8", - "spki", + "der 0.7.8", + "pkcs8 0.10.2", + "spki 0.7.3", +] + +[[package]] +name = "pkcs8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba" +dependencies = [ + "der 0.6.1", + "spki 0.6.0", ] [[package]] @@ -2148,8 +2675,8 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ - "der", - "spki", + "der 0.7.8", + "spki 0.7.3", ] [[package]] @@ -2232,7 +2759,7 @@ version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "353e1ca18966c16d9deb1c69278edbc5f194139612772bd9537af60ac231e1e6" dependencies = [ - "elliptic-curve", + "elliptic-curve 0.13.8", ] [[package]] @@ -2498,6 +3025,12 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-lite" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e" + [[package]] name = "regex-syntax" version = "0.8.2" @@ -2544,6 +3077,17 @@ dependencies = [ "winreg", ] +[[package]] +name = "rfc6979" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7743f17af12fa0b03b803ba12cd6a8d9483a587e89c69445e3909655c0b9fabb" +dependencies = [ + "crypto-bigint 0.4.9", + "hmac", + "zeroize", +] + [[package]] name = "rfc6979" version = "0.4.0" @@ -2580,10 +3124,10 @@ dependencies = [ "num-integer", "num-traits", "pkcs1", - "pkcs8", + "pkcs8 0.10.2", "rand_core", - "signature", - "spki", + "signature 2.2.0", + "spki 0.7.3", "subtle", "zeroize", ] @@ -2734,16 +3278,30 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sec1" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3be24c1842290c45df0a7bf069e0c268a747ad05a192f2fd7dcfdbc1cba40928" +dependencies = [ + "base16ct 0.1.1", + "der 0.6.1", + "generic-array", + "pkcs8 0.9.0", + "subtle", + "zeroize", +] + [[package]] name = "sec1" version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3e97a565f76233a6003f9f5c54be1d9c5bdfa3eccfb189469f11ec4901c47dc" dependencies = [ - "base16ct", - "der", + "base16ct 0.2.0", + "der 0.7.8", "generic-array", - "pkcs8", + "pkcs8 0.10.2", "subtle", "zeroize", ] @@ -2913,6 +3471,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.8" @@ -2933,6 +3502,16 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "1.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" +dependencies = [ + "digest", + "rand_core", +] + [[package]] name = "signature" version = "2.2.0" @@ -2996,6 +3575,16 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spki" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67cf02bbac7a337dc36e4f5a693db6c21e7863f45070f7064577eb4367a3212b" +dependencies = [ + "base64ct", + "der 0.6.1", +] + [[package]] name = "spki" version = "0.7.3" @@ -3003,7 +3592,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" dependencies = [ "base64ct", - "der", + "der 0.7.8", ] [[package]] @@ -3429,6 +4018,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "uuid" version = "1.6.1" @@ -3469,6 +4064,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "waker-fn" version = "1.1.1" @@ -3762,6 +4363,9 @@ name = "worker" version = "0.1.0" dependencies = [ "async-trait", + "aws-config", + "aws-sdk-s3", + "aws-smithy-types", "bytes", "cc", "figment", @@ -3789,6 +4393,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yansi" version = "1.0.0-rc.1" diff --git a/rust/worker/Cargo.toml b/rust/worker/Cargo.toml index 3ec9d707e15..25a3b2d099e 100644 --- a/rust/worker/Cargo.toml +++ b/rust/worker/Cargo.toml @@ -32,6 +32,9 @@ kube = { version = "0.87.1", features = ["runtime", "derive"] } k8s-openapi = { version = "0.20.0", features = ["latest"] } bytes = "1.5.0" parking_lot = "0.12.1" +aws-sdk-s3 = "1.5.0" +aws-smithy-types = "1.1.0" +aws-config = { version = "1.1.2", features = ["behavior-version-latest"] } [build-dependencies] tonic-build = "0.10" diff --git a/rust/worker/Dockerfile b/rust/worker/Dockerfile index 96c4b08a4f0..2e3802787e1 100644 --- a/rust/worker/Dockerfile +++ b/rust/worker/Dockerfile @@ -1,4 +1,6 @@ FROM rust:1.74.1 as builder +ARG CHROMA_KUBERNETES_INTEGRATION=0 +ENV CHROMA_KUBERNETES_INTEGRATION $CHROMA_KUBERNETES_INTEGRATION WORKDIR / RUN git clone https://github.com/chroma-core/hnswlib.git diff --git a/rust/worker/build.rs b/rust/worker/build.rs index 315f75d381b..25235b5c6b0 100644 --- a/rust/worker/build.rs +++ b/rust/worker/build.rs @@ -19,5 +19,18 @@ fn main() -> Result<(), Box> { .flag("-ftree-vectorize") .compile("bindings"); + // Set a compile flag based on an environment variable that tells us if we should + // run the cluster tests + let run_cluster_tests_env_var = std::env::var("CHROMA_KUBERNETES_INTEGRATION"); + match run_cluster_tests_env_var { + Ok(val) => { + let lowered = val.to_lowercase(); + if lowered == "true" || lowered == "1" { + println!("cargo:rustc-cfg=CHROMA_KUBERNETES_INTEGRATION"); + } + } + Err(_) => {} + } + Ok(()) } diff --git a/rust/worker/chroma_config.yaml b/rust/worker/chroma_config.yaml index e760dcdb97c..99874c67de5 100644 --- a/rust/worker/chroma_config.yaml +++ b/rust/worker/chroma_config.yaml @@ -26,3 +26,6 @@ worker: port: 50051 segment_manager: storage_path: "./tmp/segment_manager/" + storage: + S3: + bucket: "chroma-storage" diff --git a/rust/worker/src/config.rs b/rust/worker/src/config.rs index 4d82cc472ad..7583bf0114e 100644 --- a/rust/worker/src/config.rs +++ b/rust/worker/src/config.rs @@ -108,6 +108,7 @@ pub(crate) struct WorkerConfig { pub(crate) ingest: crate::ingest::config::IngestConfig, pub(crate) sysdb: crate::sysdb::config::SysDbConfig, pub(crate) segment_manager: crate::segment::config::SegmentManagerConfig, + pub(crate) storage: crate::storage::config::StorageConfig, } /// # Description @@ -156,7 +157,9 @@ mod tests { port: 50051 segment_manager: storage_path: "/tmp" - + storage: + S3: + bucket: "chroma" "#, ); let config = RootConfig::load(); @@ -198,6 +201,9 @@ mod tests { port: 50051 segment_manager: storage_path: "/tmp" + storage: + S3: + bucket: "chroma" "#, ); @@ -255,6 +261,9 @@ mod tests { port: 50051 segment_manager: storage_path: "/tmp" + storage: + S3: + bucket: "chroma" "#, ); @@ -293,6 +302,9 @@ mod tests { port: 50051 segment_manager: storage_path: "/tmp" + storage: + S3: + bucket: "chroma" "#, ); let config = RootConfig::load(); diff --git a/rust/worker/src/lib.rs b/rust/worker/src/lib.rs index 39a90984c81..ae7ea7dc7d5 100644 --- a/rust/worker/src/lib.rs +++ b/rust/worker/src/lib.rs @@ -6,6 +6,7 @@ mod ingest; mod memberlist; mod segment; mod server; +mod storage; mod sysdb; mod system; mod types; diff --git a/rust/worker/src/memberlist/memberlist_provider.rs b/rust/worker/src/memberlist/memberlist_provider.rs index cfb1f4ab313..6e32da895aa 100644 --- a/rust/worker/src/memberlist/memberlist_provider.rs +++ b/rust/worker/src/memberlist/memberlist_provider.rs @@ -246,20 +246,21 @@ mod tests { use super::*; #[tokio::test] + #[cfg(CHROMA_KUBERNETES_INTEGRATION)] async fn it_can_work() { // TODO: This only works if you have a kubernetes cluster running locally with a memberlist // We need to implement a test harness for this. For now, it will silently do nothing // if you don't have a kubernetes cluster running locally and only serve as a reminder // and demonstration of how to use the memberlist provider. - // let kube_ns = "chroma".to_string(); - // let kube_client = Client::try_default().await.unwrap(); - // let memberlist_provider = CustomResourceMemberlistProvider::new( - // "worker-memberlist".to_string(), - // kube_client.clone(), - // kube_ns.clone(), - // 10, - // ); - // let mut system = System::new(); - // let handle = system.start_component(memberlist_provider); + let kube_ns = "chroma".to_string(); + let kube_client = Client::try_default().await.unwrap(); + let memberlist_provider = CustomResourceMemberlistProvider::new( + "worker-memberlist".to_string(), + kube_client.clone(), + kube_ns.clone(), + 10, + ); + let mut system = System::new(); + let handle = system.start_component(memberlist_provider); } } diff --git a/rust/worker/src/storage/config.rs b/rust/worker/src/storage/config.rs new file mode 100644 index 00000000000..85811d71509 --- /dev/null +++ b/rust/worker/src/storage/config.rs @@ -0,0 +1,20 @@ +use serde::Deserialize; + +#[derive(Deserialize)] +/// The configuration for the chosen storage. +/// # Options +/// - S3: The configuration for the s3 storage. +/// # Notes +/// See config.rs in the root of the worker crate for an example of how to use +/// config files to configure the worker. +pub(crate) enum StorageConfig { + S3(S3StorageConfig), +} + +#[derive(Deserialize)] +/// The configuration for the s3 storage type +/// # Fields +/// - bucket: The name of the bucket to use. +pub(crate) struct S3StorageConfig { + pub(crate) bucket: String, +} diff --git a/rust/worker/src/storage/mod.rs b/rust/worker/src/storage/mod.rs new file mode 100644 index 00000000000..eb89db1025e --- /dev/null +++ b/rust/worker/src/storage/mod.rs @@ -0,0 +1,9 @@ +use async_trait::async_trait; +pub(crate) mod config; +pub(crate) mod s3; + +#[async_trait] +trait Storage { + async fn get(&self, key: &str, path: &str) -> Result<(), String>; + async fn put(&self, key: &str, path: &str) -> Result<(), String>; +} diff --git a/rust/worker/src/storage/s3.rs b/rust/worker/src/storage/s3.rs new file mode 100644 index 00000000000..f78767e4896 --- /dev/null +++ b/rust/worker/src/storage/s3.rs @@ -0,0 +1,216 @@ +// Presents an interface to a storage backend such as s3 or local disk. +// The interface is a simple key-value store, which maps to s3 well. +// For now the interface fetches a file and stores it at a specific +// location on disk. This is not ideal for s3, but it is a start. + +// Ideally we would support streaming the file from s3 to the index +// but the current implementation of hnswlib makes this complicated. +// Once we move to our own implementation of hnswlib we can support +// streaming from s3. + +use super::{config::StorageConfig, Storage}; +use crate::config::{Configurable, WorkerConfig}; +use crate::errors::ChromaError; +use async_trait::async_trait; +use aws_sdk_s3; +use aws_sdk_s3::error::SdkError; +use aws_sdk_s3::operation::create_bucket::CreateBucketError; +use aws_smithy_types::byte_stream::ByteStream; +use std::clone::Clone; +use std::io::Write; + +#[derive(Clone)] +struct S3Storage { + bucket: String, + client: aws_sdk_s3::Client, +} + +impl S3Storage { + fn new(bucket: &str, client: aws_sdk_s3::Client) -> S3Storage { + return S3Storage { + bucket: bucket.to_string(), + client: client, + }; + } + + async fn create_bucket(&self) -> Result<(), String> { + // Creates a public bucket with default settings in the region. + // This should only be used for testing and in production + // the bucket should be provisioned ahead of time. + let res = self + .client + .create_bucket() + .bucket(self.bucket.clone()) + .send() + .await; + match res { + Ok(_) => { + println!("created bucket {}", self.bucket); + return Ok(()); + } + Err(e) => match e { + SdkError::ServiceError(err) => match err.into_err() { + CreateBucketError::BucketAlreadyExists(msg) => { + println!("bucket already exists: {}", msg); + return Ok(()); + } + CreateBucketError::BucketAlreadyOwnedByYou(msg) => { + println!("bucket already owned by you: {}", msg); + return Ok(()); + } + e => { + println!("error: {}", e.to_string()); + return Err::<(), String>(e.to_string()); + } + }, + _ => { + println!("error: {}", e); + return Err::<(), String>(e.to_string()); + } + }, + } + } +} + +#[async_trait] +impl Configurable for S3Storage { + async fn try_from_config(config: &WorkerConfig) -> Result> { + match &config.storage { + StorageConfig::S3(s3_config) => { + let config = aws_config::load_from_env().await; + let client = aws_sdk_s3::Client::new(&config); + + let storage = S3Storage::new(&s3_config.bucket, client); + return Ok(storage); + } + } + } +} + +#[async_trait] +impl Storage for S3Storage { + async fn get(&self, key: &str, path: &str) -> Result<(), String> { + let mut file = std::fs::File::create(path); + let res = self + .client + .get_object() + .bucket(self.bucket.clone()) + .key(key) + .send() + .await; + match res { + Ok(mut res) => { + match file { + Ok(mut file) => { + while let bytes = res.body.next().await { + match bytes { + Some(bytes) => match bytes { + Ok(bytes) => { + file.write_all(&bytes).unwrap(); + } + Err(e) => { + println!("error: {}", e); + return Err::<(), String>(e.to_string()); + } + }, + None => { + // Stream is done + return Ok(()); + } + } + } + } + Err(e) => { + println!("error: {}", e); + return Err::<(), String>(e.to_string()); + } + } + return Ok(()); + } + Err(e) => { + println!("error: {}", e); + return Err::<(), String>(e.to_string()); + } + } + } + + async fn put(&self, key: &str, path: &str) -> Result<(), String> { + // Puts from a file on disk to s3. + let bytestream = ByteStream::from_path(path).await; + match bytestream { + Ok(bytestream) => { + let res = self + .client + .put_object() + .bucket(self.bucket.clone()) + .key(key) + .body(bytestream) + .send() + .await; + match res { + Ok(_) => { + println!("put object {} to bucket {}", key, self.bucket); + return Ok(()); + } + Err(e) => { + println!("error: {}", e); + return Err::<(), String>(e.to_string()); + } + } + } + Err(e) => { + println!("error: {}", e); + return Err::<(), String>(e.to_string()); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[tokio::test] + #[cfg(CHROMA_KUBERNETES_INTEGRATION)] + async fn test_get() { + // Set up credentials assuming minio is running locally + let cred = aws_sdk_s3::config::Credentials::new( + "minio", + "minio123", + None, + None, + "loaded-from-env", + ); + + // Set up s3 client + let config = aws_sdk_s3::config::Builder::new() + .endpoint_url("http://127.0.0.1:9000".to_string()) + .credentials_provider(cred) + .behavior_version_latest() + .region(aws_sdk_s3::config::Region::new("us-east-1")) + .force_path_style(true) + .build(); + let client = aws_sdk_s3::Client::from_conf(config); + + let storage = S3Storage { + bucket: "test".to_string(), + client: client, + }; + storage.create_bucket().await.unwrap(); + + // Write some data to a test file, put it in s3, get it back and verify its contents + let tmp_dir = tempdir().unwrap(); + let persist_path = tmp_dir.path().to_str().unwrap().to_string(); + + let test_data = "test data"; + let test_file_in = format!("{}/test_file_in", persist_path); + let test_file_out = format!("{}/test_file_out", persist_path); + std::fs::write(&test_file_in, test_data).unwrap(); + storage.put("test", &test_file_in).await.unwrap(); + storage.get("test", &test_file_out).await.unwrap(); + + let contents = std::fs::read_to_string(test_file_out).unwrap(); + assert_eq!(contents, test_data); + } +}