Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions docs/docs/core/flow_def.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -416,22 +416,28 @@ flow_builder.declare(
### Auth Registry

CocoIndex manages an auth registry. It's an in-memory key-value store, mainly to store authentication information for a backend.
It's usually used for targets, where key stability is important for backend cleanup.

Operation spec is the default way to configure a persistent backend. But it has the following limitations:
Operation spec is the default way to configure sources, functions and targets. But it has the following limitations:

* The spec isn't supposed to contain secret information, and it's frequently shown in various places, e.g. `cocoindex show`.
* Once an operation is removed after flow definition code change, the spec is also gone.
But we still need to be able to drop the backend (e.g. a table) when [setup / drop flow](/docs/core/flow_methods#setup--drop-flow).
* For targets, once an operation is removed after flow definition code change, the spec is also gone.
But we still need to be able to drop the persistent backend (e.g. a table) when [setup / drop flow](/docs/core/flow_methods#setup--drop-flow).

Auth registry is introduced to solve the problems above. It works as follows:
Auth registry is introduced to solve the problems above.

* You can create new **auth entry** by a key and a value.
* You can references the entry by the key, and pass it as part of spec for certain operations. e.g. `Neo4j` takes `connection` field in the form of auth entry reference.

#### Auth Entry

An auth entry is an entry in the auth registry with an explicit key.

* You can create new *auth entry* by a key and a value.
* You can reference the entry by the key, and pass it as part of spec for certain operations. e.g. `Neo4j` takes `connection` field in the form of auth entry reference.

<Tabs>
<TabItem value="python" label="Python" default>

You can add an auth entry by `cocoindex.add_auth_entry()` function, which returns a `cocoindex.AuthEntryReference`:
You can add an auth entry by `cocoindex.add_auth_entry()` function, which returns a `cocoindex.AuthEntryReference[T]`:

```python
my_graph_conn = cocoindex.add_auth_entry(
Expand All @@ -445,7 +451,7 @@ my_graph_conn = cocoindex.add_auth_entry(

Then reference it when building a spec that takes an auth entry:

* You can either reference by the `AuthEntryReference` object directly:
* You can either reference by the `AuthEntryReference[T]` object directly:

```python
demo_collector.export(
Expand All @@ -472,3 +478,28 @@ Note that CocoIndex backends use the key of an auth entry to identify the backen

* If a key is no longer referenced in any operation spec, keep it until the next flow setup / drop action,
so that CocoIndex will be able to clean up the backends.

#### Transient Auth Entry

A transient auth entry is an entry in the auth registry with an automatically generated key.
It's usually used for sources and functions, where key stability is not important.

<Tabs>
<TabItem value="python" label="Python" default>

You can create a new *transient auth entry* by `cocoindex.add_transient_auth_entry()` function, which returns a `cocoindex.TransientAuthEntryReference[T]`, and pass it to a source or function spec that takes it, e.g.

```python
flow_builder.add_source(
cocoindex.sources.AzureBlob(
...
sas_token=cocoindex.add_transient_auth_entry("...")
)
)
```


</TabItem>
</Tabs>

Whenever a `TransientAuthEntryReference[T]` is expected, you can also pass a `AuthEntryReference[T]` instead, as `AuthEntryReference[T]` is a subtype of `TransientAuthEntryReference[T]`.
45 changes: 29 additions & 16 deletions docs/docs/ops/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,22 +170,33 @@ These are actions you need to take:

#### Authentication

We use Azure’s **Default Credential** system (DefaultAzureCredential) for secure and flexible authentication.
This allows you to connect to Azure services without putting any secrets in the code or flow spec.
It automatically chooses the best authentication method based on your environment:

* On your local machine: uses your Azure CLI login (`az login`) or environment variables.

```sh
az login
# Optional: Set a default subscription if you have more than one
az account set --subscription "<YOUR_SUBSCRIPTION_NAME_OR_ID>"
```
* In Azure (VM, App Service, AKS, etc.): uses the resource’s Managed Identity.
* In automated environments: supports Service Principals via environment variables
* `AZURE_CLIENT_ID`
* `AZURE_TENANT_ID`
* `AZURE_CLIENT_SECRET`
We support the following authentication methods:

* Shared access signature (SAS) tokens.
You can generate it from the Azure Portal in the settings for a specific container.
You need to provide at least *List* and *Read* permissions when generating the SAS token.
It's a query string in the form of
`sp=rl&st=2025-07-20T09:33:00Z&se=2025-07-19T09:48:53Z&sv=2024-11-04&sr=c&sig=i3FDjsadfklj3%23adsfkk`.

* Storage account access key. You can find it in the Azure Portal in the settings for a specific storage account.

* Default credential. When none of the above is provided, it will use the default credential.

This allows you to connect to Azure services without putting any secrets in the code or flow spec.
It automatically chooses the best authentication method based on your environment:

* On your local machine: uses your Azure CLI login (`az login`) or environment variables.

```sh
az login
# Optional: Set a default subscription if you have more than one
az account set --subscription "<YOUR_SUBSCRIPTION_NAME_OR_ID>"
```
* In Azure (VM, App Service, AKS, etc.): uses the resource’s Managed Identity.
* In automated environments: supports Service Principals via environment variables
* `AZURE_CLIENT_ID`
* `AZURE_TENANT_ID`
* `AZURE_CLIENT_SECRET`

You can refer to [this doc](https://learn.microsoft.com/en-us/azure/developer/python/sdk/authentication/overview) for more details.

Expand All @@ -202,6 +213,8 @@ The spec takes the following fields:
* `excluded_patterns` (`list[str]`, optional): a list of glob patterns to exclude files, e.g. `["*.tmp", "**/*.log"]`.
Any file or directory matching these patterns will be excluded even if they match `included_patterns`.
If not specified, no files will be excluded.
* `sas_token` (`cocoindex.TransientAuthEntryReference[str]`, optional): a SAS token for authentication.
* `account_access_key` (`cocoindex.TransientAuthEntryReference[str]`, optional): an account access key for authentication.

:::info

Expand Down
3 changes: 2 additions & 1 deletion python/cocoindex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from . import targets as storages # Deprecated: Use targets instead

from .auth_registry import AuthEntryReference, add_auth_entry, ref_auth_entry
from .auth_registry import AuthEntryReference, add_auth_entry, add_transient_auth_entry
from .flow import FlowBuilder, DataScope, DataSlice, Flow, transform_flow
from .flow import flow_def
from .flow import EvaluateAndDumpOptions, GeneratedField
Expand Down Expand Up @@ -42,6 +42,7 @@
# Auth registry
"AuthEntryReference",
"add_auth_entry",
"add_transient_auth_entry",
"ref_auth_entry",
# Flow
"FlowBuilder",
Expand Down
26 changes: 24 additions & 2 deletions python/cocoindex/auth_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,42 @@

from dataclasses import dataclass
from typing import Generic, TypeVar
import threading

from . import _engine # type: ignore
from .convert import dump_engine_object

T = TypeVar("T")

# Global atomic counter for generating unique auth entry keys
_counter_lock = threading.Lock()
_auth_key_counter = 0


def _generate_auth_key() -> str:
"""Generate a unique auth entry key using a global atomic counter."""
global _auth_key_counter # pylint: disable=global-statement
with _counter_lock:
_auth_key_counter += 1
return f"__auth_{_auth_key_counter}"


@dataclass
class AuthEntryReference(Generic[T]):
"""Reference an auth entry by its key."""
class TransientAuthEntryReference(Generic[T]):
"""Reference an auth entry, may or may not have a stable key."""

key: str


class AuthEntryReference(TransientAuthEntryReference[T]):
"""Reference an auth entry, with a key stable across ."""


def add_transient_auth_entry(value: T) -> TransientAuthEntryReference[T]:
"""Add an auth entry to the registry. Returns its reference."""
return add_auth_entry(_generate_auth_key(), value)


def add_auth_entry(key: str, value: T) -> AuthEntryReference[T]:
"""Add an auth entry to the registry. Returns its reference."""
_engine.add_auth_entry(key, dump_engine_object(value))
Expand Down
9 changes: 9 additions & 0 deletions python/cocoindex/sources.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""All builtin sources."""

from . import op
from .auth_registry import TransientAuthEntryReference
import datetime


Expand Down Expand Up @@ -48,6 +49,11 @@ class AmazonS3(op.SourceSpec):
class AzureBlob(op.SourceSpec):
"""
Import data from an Azure Blob Storage container. Supports optional prefix and file filtering by glob patterns.

Authentication mechanisms taken in the following order:
- SAS token (if provided)
- Account access key (if provided)
- Default Azure credential
"""

_op_category = op.OpCategory.SOURCE
Expand All @@ -58,3 +64,6 @@ class AzureBlob(op.SourceSpec):
binary: bool = False
included_patterns: list[str] | None = None
excluded_patterns: list[str] | None = None

sas_token: TransientAuthEntryReference[str] | None = None
account_access_key: TransientAuthEntryReference[str] | None = None
28 changes: 20 additions & 8 deletions src/ops/sources/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ pub struct Spec {
binary: bool,
included_patterns: Option<Vec<String>>,
excluded_patterns: Option<Vec<String>>,

/// SAS token for authentication. Takes precedence over account_access_key.
sas_token: Option<AuthEntryReference<String>>,
/// Account access key for authentication. If not provided, will use default Azure credential.
account_access_key: Option<AuthEntryReference<String>>,
}

struct Executor {
Expand Down Expand Up @@ -209,15 +214,22 @@ impl SourceFactoryBase for Factory {
async fn build_executor(
self: Arc<Self>,
spec: Spec,
_context: Arc<FlowInstanceContext>,
context: Arc<FlowInstanceContext>,
) -> Result<Box<dyn SourceExecutor>> {
let default_credential = Arc::new(DefaultAzureCredential::create(
TokenCredentialOptions::default(),
)?);
let client = BlobServiceClient::new(
&spec.account_name,
StorageCredentials::token_credential(default_credential),
);
let credential = if let Some(sas_token) = spec.sas_token {
let sas_token = context.auth_registry.get(&sas_token)?;
StorageCredentials::sas_token(sas_token)?
} else if let Some(account_access_key) = spec.account_access_key {
let account_access_key = context.auth_registry.get(&account_access_key)?;
StorageCredentials::access_key(spec.account_name.clone(), account_access_key)
} else {
let default_credential = Arc::new(DefaultAzureCredential::create(
TokenCredentialOptions::default(),
)?);
StorageCredentials::token_credential(default_credential)
};

let client = BlobServiceClient::new(&spec.account_name, credential);
Ok(Box::new(Executor {
client,
container_name: spec.container_name,
Expand Down
Loading