From 50bb6332ecd9ceecfa2da2685cb58a3e6a6d0b08 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Fri, 18 Jul 2025 20:05:50 -0700 Subject: [PATCH] feat(azure-auth): add more authentication for azure blob storage --- docs/docs/core/flow_def.mdx | 47 +++++++++++++++++++++++++------ docs/docs/ops/sources.md | 45 ++++++++++++++++++----------- python/cocoindex/__init__.py | 3 +- python/cocoindex/auth_registry.py | 26 +++++++++++++++-- python/cocoindex/sources.py | 9 ++++++ src/ops/sources/azure_blob.rs | 28 ++++++++++++------ 6 files changed, 123 insertions(+), 35 deletions(-) diff --git a/docs/docs/core/flow_def.mdx b/docs/docs/core/flow_def.mdx index ce6fd749..fb5742f7 100644 --- a/docs/docs/core/flow_def.mdx +++ b/docs/docs/core/flow_def.mdx @@ -416,22 +416,28 @@ flow_builder.declare( ### Auth Registry CocoIndex manages an auth registry. It's an in-memory key-value store, mainly to store authentication information for a backend. +It's usually used for targets, where key stability is important for backend cleanup. -Operation spec is the default way to configure a persistent backend. But it has the following limitations: +Operation spec is the default way to configure sources, functions and targets. But it has the following limitations: * The spec isn't supposed to contain secret information, and it's frequently shown in various places, e.g. `cocoindex show`. -* Once an operation is removed after flow definition code change, the spec is also gone. - But we still need to be able to drop the backend (e.g. a table) when [setup / drop flow](/docs/core/flow_methods#setup--drop-flow). +* For targets, once an operation is removed after flow definition code change, the spec is also gone. + But we still need to be able to drop the persistent backend (e.g. a table) when [setup / drop flow](/docs/core/flow_methods#setup--drop-flow). -Auth registry is introduced to solve the problems above. It works as follows: +Auth registry is introduced to solve the problems above. -* You can create new **auth entry** by a key and a value. -* You can references the entry by the key, and pass it as part of spec for certain operations. e.g. `Neo4j` takes `connection` field in the form of auth entry reference. + +#### Auth Entry + +An auth entry is an entry in the auth registry with an explicit key. + +* You can create new *auth entry* by a key and a value. +* You can reference the entry by the key, and pass it as part of spec for certain operations. e.g. `Neo4j` takes `connection` field in the form of auth entry reference. -You can add an auth entry by `cocoindex.add_auth_entry()` function, which returns a `cocoindex.AuthEntryReference`: +You can add an auth entry by `cocoindex.add_auth_entry()` function, which returns a `cocoindex.AuthEntryReference[T]`: ```python my_graph_conn = cocoindex.add_auth_entry( @@ -445,7 +451,7 @@ my_graph_conn = cocoindex.add_auth_entry( Then reference it when building a spec that takes an auth entry: -* You can either reference by the `AuthEntryReference` object directly: +* You can either reference by the `AuthEntryReference[T]` object directly: ```python demo_collector.export( @@ -472,3 +478,28 @@ Note that CocoIndex backends use the key of an auth entry to identify the backen * If a key is no longer referenced in any operation spec, keep it until the next flow setup / drop action, so that CocoIndex will be able to clean up the backends. + +#### Transient Auth Entry + +A transient auth entry is an entry in the auth registry with an automatically generated key. +It's usually used for sources and functions, where key stability is not important. + + + + +You can create a new *transient auth entry* by `cocoindex.add_transient_auth_entry()` function, which returns a `cocoindex.TransientAuthEntryReference[T]`, and pass it to a source or function spec that takes it, e.g. + +```python +flow_builder.add_source( + cocoindex.sources.AzureBlob( + ... + sas_token=cocoindex.add_transient_auth_entry("...") + ) +) +``` + + + + + +Whenever a `TransientAuthEntryReference[T]` is expected, you can also pass a `AuthEntryReference[T]` instead, as `AuthEntryReference[T]` is a subtype of `TransientAuthEntryReference[T]`. diff --git a/docs/docs/ops/sources.md b/docs/docs/ops/sources.md index ca6120f6..30f5d5dd 100644 --- a/docs/docs/ops/sources.md +++ b/docs/docs/ops/sources.md @@ -170,22 +170,33 @@ These are actions you need to take: #### Authentication -We use Azure’s **Default Credential** system (DefaultAzureCredential) for secure and flexible authentication. -This allows you to connect to Azure services without putting any secrets in the code or flow spec. -It automatically chooses the best authentication method based on your environment: - -* On your local machine: uses your Azure CLI login (`az login`) or environment variables. - - ```sh - az login - # Optional: Set a default subscription if you have more than one - az account set --subscription "" - ``` -* In Azure (VM, App Service, AKS, etc.): uses the resource’s Managed Identity. -* In automated environments: supports Service Principals via environment variables - * `AZURE_CLIENT_ID` - * `AZURE_TENANT_ID` - * `AZURE_CLIENT_SECRET` +We support the following authentication methods: + +* Shared access signature (SAS) tokens. + You can generate it from the Azure Portal in the settings for a specific container. + You need to provide at least *List* and *Read* permissions when generating the SAS token. + It's a query string in the form of + `sp=rl&st=2025-07-20T09:33:00Z&se=2025-07-19T09:48:53Z&sv=2024-11-04&sr=c&sig=i3FDjsadfklj3%23adsfkk`. + +* Storage account access key. You can find it in the Azure Portal in the settings for a specific storage account. + +* Default credential. When none of the above is provided, it will use the default credential. + + This allows you to connect to Azure services without putting any secrets in the code or flow spec. + It automatically chooses the best authentication method based on your environment: + + * On your local machine: uses your Azure CLI login (`az login`) or environment variables. + + ```sh + az login + # Optional: Set a default subscription if you have more than one + az account set --subscription "" + ``` + * In Azure (VM, App Service, AKS, etc.): uses the resource’s Managed Identity. + * In automated environments: supports Service Principals via environment variables + * `AZURE_CLIENT_ID` + * `AZURE_TENANT_ID` + * `AZURE_CLIENT_SECRET` You can refer to [this doc](https://learn.microsoft.com/en-us/azure/developer/python/sdk/authentication/overview) for more details. @@ -202,6 +213,8 @@ The spec takes the following fields: * `excluded_patterns` (`list[str]`, optional): a list of glob patterns to exclude files, e.g. `["*.tmp", "**/*.log"]`. Any file or directory matching these patterns will be excluded even if they match `included_patterns`. If not specified, no files will be excluded. +* `sas_token` (`cocoindex.TransientAuthEntryReference[str]`, optional): a SAS token for authentication. +* `account_access_key` (`cocoindex.TransientAuthEntryReference[str]`, optional): an account access key for authentication. :::info diff --git a/python/cocoindex/__init__.py b/python/cocoindex/__init__.py index cf2069e6..a048827f 100644 --- a/python/cocoindex/__init__.py +++ b/python/cocoindex/__init__.py @@ -6,7 +6,7 @@ from . import targets as storages # Deprecated: Use targets instead -from .auth_registry import AuthEntryReference, add_auth_entry, ref_auth_entry +from .auth_registry import AuthEntryReference, add_auth_entry, add_transient_auth_entry from .flow import FlowBuilder, DataScope, DataSlice, Flow, transform_flow from .flow import flow_def from .flow import EvaluateAndDumpOptions, GeneratedField @@ -42,6 +42,7 @@ # Auth registry "AuthEntryReference", "add_auth_entry", + "add_transient_auth_entry", "ref_auth_entry", # Flow "FlowBuilder", diff --git a/python/cocoindex/auth_registry.py b/python/cocoindex/auth_registry.py index 1e17b636..d67cc5e6 100644 --- a/python/cocoindex/auth_registry.py +++ b/python/cocoindex/auth_registry.py @@ -4,20 +4,42 @@ from dataclasses import dataclass from typing import Generic, TypeVar +import threading from . import _engine # type: ignore from .convert import dump_engine_object T = TypeVar("T") +# Global atomic counter for generating unique auth entry keys +_counter_lock = threading.Lock() +_auth_key_counter = 0 + + +def _generate_auth_key() -> str: + """Generate a unique auth entry key using a global atomic counter.""" + global _auth_key_counter # pylint: disable=global-statement + with _counter_lock: + _auth_key_counter += 1 + return f"__auth_{_auth_key_counter}" + @dataclass -class AuthEntryReference(Generic[T]): - """Reference an auth entry by its key.""" +class TransientAuthEntryReference(Generic[T]): + """Reference an auth entry, may or may not have a stable key.""" key: str +class AuthEntryReference(TransientAuthEntryReference[T]): + """Reference an auth entry, with a key stable across .""" + + +def add_transient_auth_entry(value: T) -> TransientAuthEntryReference[T]: + """Add an auth entry to the registry. Returns its reference.""" + return add_auth_entry(_generate_auth_key(), value) + + def add_auth_entry(key: str, value: T) -> AuthEntryReference[T]: """Add an auth entry to the registry. Returns its reference.""" _engine.add_auth_entry(key, dump_engine_object(value)) diff --git a/python/cocoindex/sources.py b/python/cocoindex/sources.py index f09583fc..489ca55a 100644 --- a/python/cocoindex/sources.py +++ b/python/cocoindex/sources.py @@ -1,6 +1,7 @@ """All builtin sources.""" from . import op +from .auth_registry import TransientAuthEntryReference import datetime @@ -48,6 +49,11 @@ class AmazonS3(op.SourceSpec): class AzureBlob(op.SourceSpec): """ Import data from an Azure Blob Storage container. Supports optional prefix and file filtering by glob patterns. + + Authentication mechanisms taken in the following order: + - SAS token (if provided) + - Account access key (if provided) + - Default Azure credential """ _op_category = op.OpCategory.SOURCE @@ -58,3 +64,6 @@ class AzureBlob(op.SourceSpec): binary: bool = False included_patterns: list[str] | None = None excluded_patterns: list[str] | None = None + + sas_token: TransientAuthEntryReference[str] | None = None + account_access_key: TransientAuthEntryReference[str] | None = None diff --git a/src/ops/sources/azure_blob.rs b/src/ops/sources/azure_blob.rs index 10a15392..c2ae29c3 100644 --- a/src/ops/sources/azure_blob.rs +++ b/src/ops/sources/azure_blob.rs @@ -19,6 +19,11 @@ pub struct Spec { binary: bool, included_patterns: Option>, excluded_patterns: Option>, + + /// SAS token for authentication. Takes precedence over account_access_key. + sas_token: Option>, + /// Account access key for authentication. If not provided, will use default Azure credential. + account_access_key: Option>, } struct Executor { @@ -209,15 +214,22 @@ impl SourceFactoryBase for Factory { async fn build_executor( self: Arc, spec: Spec, - _context: Arc, + context: Arc, ) -> Result> { - let default_credential = Arc::new(DefaultAzureCredential::create( - TokenCredentialOptions::default(), - )?); - let client = BlobServiceClient::new( - &spec.account_name, - StorageCredentials::token_credential(default_credential), - ); + let credential = if let Some(sas_token) = spec.sas_token { + let sas_token = context.auth_registry.get(&sas_token)?; + StorageCredentials::sas_token(sas_token)? + } else if let Some(account_access_key) = spec.account_access_key { + let account_access_key = context.auth_registry.get(&account_access_key)?; + StorageCredentials::access_key(spec.account_name.clone(), account_access_key) + } else { + let default_credential = Arc::new(DefaultAzureCredential::create( + TokenCredentialOptions::default(), + )?); + StorageCredentials::token_credential(default_credential) + }; + + let client = BlobServiceClient::new(&spec.account_name, credential); Ok(Box::new(Executor { client, container_name: spec.container_name,