From 12a9d84f0c7f8a5db20bfb7b0d7c2baf98d686aa Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Sat, 13 Aug 2022 23:04:36 +0200 Subject: [PATCH] feat: add token provider authorization to azure store (#2374) * feat: add token provider authorizatiojn to azure store * Apply suggestions from code review Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> * feat: adpot latest APIs from altest version * chore: clippy * fix: lifetime issue * chore: better errors and docs * chore: fmt whitespace * fix: firce first error in get method * chore: avoid unwrapping some options Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- .gitignore | 5 +- object_store/Cargo.toml | 17 +- object_store/src/azure.rs | 390 +++++++++++++++++++++----------------- 3 files changed, 227 insertions(+), 185 deletions(-) diff --git a/.gitignore b/.gitignore index 2088dd5d206..5810e5addce 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,7 @@ venv/* parquet/data.parquet # release notes cache .githubchangeloggenerator.cache -.githubchangeloggenerator.cache.log \ No newline at end of file +.githubchangeloggenerator.cache.log +justfile +.prettierignore +.env \ No newline at end of file diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index ffb65aaa7ee..bb371988aa4 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -22,11 +22,7 @@ edition = "2021" license = "MIT/Apache-2.0" readme = "README.md" description = "A generic object store interface for uniformly interacting with AWS S3, Google Cloud Storage, Azure Blob Storage and local files." -keywords = [ - "object", - "storage", - "cloud", -] +keywords = ["object", "storage", "cloud"] repository = "https://github.com/apache/arrow-rs" [package.metadata.docs.rs] @@ -35,9 +31,10 @@ all-features = true [dependencies] # In alphabetical order async-trait = "0.1.53" # Microsoft Azure Blob storage integration -azure_core = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] } -azure_storage = { version = "0.2", optional = true, default-features = false, features = ["account"] } -azure_storage_blobs = { version = "0.2", optional = true, default-features = false, features = ["enable_reqwest_rustls"] } +azure_core = { version = "0.4", optional = true, default-features = false, features = ["enable_reqwest_rustls"] } +azure_identity = { version = "0.5", optional = true, default-features = false, features = ["enable_reqwest_rustls"]} +azure_storage = { version = "0.5", optional = true, default-features = false, features = ["enable_reqwest_rustls"]} +azure_storage_blobs = { version = "0.5", optional = true, default-features = false, features = ["enable_reqwest_rustls"] } bytes = "1.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } # Google Cloud Storage integration @@ -70,7 +67,7 @@ url = "2.2" walkdir = "2" [features] -azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest"] +azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest", "azure_identity"] azure_test = ["azure", "azure_core/azurite_workaround", "azure_storage/azurite_workaround", "azure_storage_blobs/azurite_workaround"] gcp = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64", "rand", "ring"] aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts", "hyper", "hyper-rustls"] @@ -78,4 +75,4 @@ aws = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts", "hyper", " [dev-dependencies] # In alphabetical order dotenv = "0.15.0" tempfile = "3.1.0" -futures-test = "0.3" \ No newline at end of file +futures-test = "0.3" diff --git a/object_store/src/azure.rs b/object_store/src/azure.rs index 6a5f537997c..9987c0370df 100644 --- a/object_store/src/azure.rs +++ b/object_store/src/azure.rs @@ -33,22 +33,26 @@ use crate::{ GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, }; use async_trait::async_trait; -use azure_core::{prelude::*, HttpClient}; -use azure_storage::core::prelude::{AsStorageClient, StorageAccountClient}; -use azure_storage_blobs::blob::responses::ListBlobsResponse; +use azure_core::{ + error::{Error as AzureError, ErrorKind as AzureErrorKind}, + prelude::*, + StatusCode, +}; +use azure_identity::{ + AutoRefreshingTokenCredential, ClientSecretCredential, TokenCredentialOptions, +}; +use azure_storage::core::clients::StorageClient; use azure_storage_blobs::blob::Blob; -use azure_storage_blobs::{ - prelude::{AsBlobClient, AsContainerClient, ContainerClient}, - DeleteSnapshotsMethod, +use azure_storage_blobs::container::operations::ListBlobsResponse; +use azure_storage_blobs::prelude::{ + AsContainerClient, ContainerClient, DeleteSnapshotsMethod, }; use bytes::Bytes; -use futures::{ - future::BoxFuture, - stream::{self, BoxStream}, - StreamExt, TryStreamExt, -}; +use chrono::{TimeZone, Utc}; +use futures::{future::BoxFuture, stream::BoxStream, StreamExt, TryStreamExt}; use snafu::{ResultExt, Snafu}; use std::collections::BTreeSet; +use std::fmt::{Debug, Formatter}; use std::io; use std::{convert::TryInto, sync::Arc}; use tokio::io::AsyncWrite; @@ -66,7 +70,7 @@ enum Error { source, ))] UnableToDeleteData { - source: Box, + source: AzureError, container: String, path: String, }, @@ -79,7 +83,7 @@ enum Error { source, ))] UnableToGetData { - source: Box, + source: AzureError, container: String, path: String, }, @@ -92,7 +96,7 @@ enum Error { source, ))] UnableToHeadData { - source: Box, + source: AzureError, container: String, path: String, }, @@ -105,7 +109,7 @@ enum Error { source, ))] UnableToGetPieceOfData { - source: Box, + source: AzureError, container: String, path: String, }, @@ -118,7 +122,7 @@ enum Error { source, ))] UnableToPutData { - source: Box, + source: AzureError, container: String, path: String, }, @@ -130,7 +134,7 @@ enum Error { source, ))] UnableToListData { - source: Box, + source: AzureError, container: String, }, @@ -142,7 +146,7 @@ enum Error { source ))] UnableToCopyFile { - source: Box, + source: AzureError, container: String, from: String, to: String, @@ -160,12 +164,12 @@ enum Error { NotFound { path: String, - source: Box, + source: AzureError, }, AlreadyExists { path: String, - source: Box, + source: AzureError, }, #[cfg(not(feature = "azure_test"))] @@ -189,18 +193,24 @@ enum Error { #[snafu(display("Account must be specified"))] MissingAccount {}, - #[snafu(display("Access key must be specified"))] - MissingAccessKey {}, - #[snafu(display("Container name must be specified"))] MissingContainerName {}, + + #[snafu(display("At least one authorization option must be specified"))] + MissingCredentials {}, } impl From for super::Error { fn from(source: Error) -> Self { match source { - Error::NotFound { path, source } => Self::NotFound { path, source }, - Error::AlreadyExists { path, source } => Self::AlreadyExists { path, source }, + Error::NotFound { path, source } => Self::NotFound { + path, + source: Box::new(source), + }, + Error::AlreadyExists { path, source } => Self::AlreadyExists { + path, + source: Box::new(source), + }, _ => Self::Generic { store: "Azure Blob Storage", source: Box::new(source), @@ -227,25 +237,15 @@ impl std::fmt::Display for MicrosoftAzure { } } -#[allow(clippy::borrowed_box)] -fn check_err_not_found(err: &Box) -> bool { - if let Some(azure_core::HttpError::StatusCode { status, .. }) = - err.downcast_ref::() - { - return status.as_u16() == 404; - }; - false -} - #[async_trait] impl ObjectStore for MicrosoftAzure { async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { let bytes = bytes::BytesMut::from(&*bytes); self.container_client - .as_blob_client(location.as_ref()) + .blob_client(location.as_ref()) .put_block_blob(bytes) - .execute() + .into_future() .await .context(UnableToPutDataSnafu { container: &self.container_name, @@ -277,29 +277,32 @@ impl ObjectStore for MicrosoftAzure { } async fn get(&self, location: &Path) -> Result { - let blob = self + let loc = location.clone(); + let mut stream = self .container_client - .as_blob_client(location.as_ref()) + .blob_client(location.as_ref()) .get() - .execute() - .await - .map_err(|err| { - if check_err_not_found(&err) { - return Error::NotFound { - source: err, - path: location.to_string(), - }; - }; - Error::UnableToGetData { - source: err, - container: self.container_name.clone(), - path: location.to_string(), - } - })?; + .into_stream() + .and_then(|chunk| chunk.data.collect()) + .map_err(move |err| match err.kind() { + AzureErrorKind::HttpResponse { + status: StatusCode::NotFound, + .. + } => crate::Error::NotFound { + source: Box::new(err), + path: loc.to_string(), + }, + _ => crate::Error::Generic { + source: Box::new(err), + store: "MicrosoftAzure", + }, + }) + .boxed(); - Ok(GetResult::Stream( - futures::stream::once(async move { Ok(blob.data) }).boxed(), - )) + let first = stream.next().await.transpose()?.unwrap_or_default(); + Ok(GetResult::Stream(Box::pin( + futures::stream::once(async { Ok(first) }).chain(stream), + ))) } async fn get_range( @@ -307,49 +310,62 @@ impl ObjectStore for MicrosoftAzure { location: &Path, range: std::ops::Range, ) -> Result { - let blob = self + let map_azure_err = |err: AzureError| match err.kind() { + AzureErrorKind::HttpResponse { + status: StatusCode::NotFound, + .. + } => Error::NotFound { + source: err, + path: location.to_string(), + }, + _ => Error::UnableToGetPieceOfData { + source: err, + container: self.container_name.clone(), + path: location.to_string(), + }, + }; + + let mut stream = self .container_client - .as_blob_client(location.as_ref()) + .blob_client(location.as_ref()) .get() .range(range) - .execute() - .await - .map_err(|err| { - if check_err_not_found(&err) { - return Error::NotFound { - source: err, - path: location.to_string(), - }; - }; - Error::UnableToGetPieceOfData { - source: err, - container: self.container_name.clone(), - path: location.to_string(), - } - })?; + .into_stream(); + + let mut chunk: Vec = vec![]; + while let Some(value) = stream.next().await { + let value = value + .map_err(map_azure_err)? + .data + .collect() + .await + .map_err(map_azure_err)?; + chunk.extend(&value); + } - Ok(blob.data) + Ok(chunk.into()) } async fn head(&self, location: &Path) -> Result { let res = self .container_client - .as_blob_client(location.as_ref()) + .blob_client(location.as_ref()) .get_properties() - .execute() + .into_future() .await - .map_err(|err| { - if check_err_not_found(&err) { - return Error::NotFound { - source: err, - path: location.to_string(), - }; - }; - Error::UnableToHeadData { + .map_err(|err| match err.kind() { + AzureErrorKind::HttpResponse { + status: StatusCode::NotFound, + .. + } => Error::NotFound { + source: err, + path: location.to_string(), + }, + _ => Error::UnableToHeadData { source: err, container: self.container_name.clone(), path: location.to_string(), - } + }, })?; convert_object_meta(res.blob)?.ok_or_else(|| super::Error::NotFound { @@ -360,10 +376,10 @@ impl ObjectStore for MicrosoftAzure { async fn delete(&self, location: &Path) -> Result<()> { self.container_client - .as_blob_client(location.as_ref()) + .blob_client(location.as_ref()) .delete() .delete_snapshots_method(DeleteSnapshotsMethod::Include) - .execute() + .into_future() .await .context(UnableToDeleteDataSnafu { container: &self.container_name, @@ -426,9 +442,9 @@ impl ObjectStore for MicrosoftAzure { async fn copy(&self, from: &Path, to: &Path) -> Result<()> { let from_url = self.get_copy_from_url(from)?; self.container_client - .as_blob_client(to.as_ref()) - .copy(&from_url) - .execute() + .blob_client(to.as_ref()) + .copy(from_url) + .into_future() .await .context(UnableToCopyFileSnafu { container: &self.container_name, @@ -441,20 +457,20 @@ impl ObjectStore for MicrosoftAzure { async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { let from_url = self.get_copy_from_url(from)?; self.container_client - .as_blob_client(to.as_ref()) - .copy(&from_url) - .if_match_condition(IfMatchCondition::NotMatch("*".to_string())) - .execute() + .blob_client(to.as_ref()) + .copy(from_url) + .if_match(IfMatchCondition::NotMatch("*".to_string())) + .into_future() .await .map_err(|err| { - if let Some(azure_core::HttpError::StatusCode { status, .. }) = - err.downcast_ref::() + if let AzureErrorKind::HttpResponse { + status: StatusCode::Conflict, + .. + } = err.kind() { - if status.as_u16() == 409 { - return Error::AlreadyExists { - source: err, - path: to.to_string(), - }; + return Error::AlreadyExists { + source: err, + path: to.to_string(), }; }; Error::UnableToCopyFile { @@ -486,60 +502,33 @@ impl MicrosoftAzure { prefix: Option<&Path>, delimiter: bool, ) -> Result>> { - enum ListState { - Start, - HasMore(String), - Done, + let mut stream = self.container_client.list_blobs(); + if let Some(prefix_val) = format_prefix(prefix) { + stream = stream.prefix(prefix_val); + } + if delimiter { + stream = stream.delimiter(Delimiter::new(DELIMITER)); } - let prefix_raw = format_prefix(prefix); - - Ok(stream::unfold(ListState::Start, move |state| { - let mut request = self.container_client.list_blobs(); - - if let Some(p) = prefix_raw.as_deref() { - request = request.prefix(p); - } - - if delimiter { - request = request.delimiter(Delimiter::new(DELIMITER)); - } - - async move { - match state { - ListState::HasMore(ref marker) => { - request = request.next_marker(marker as &str); - } - ListState::Done => { - return None; - } - ListState::Start => {} - } - - let resp = match request.execute().await.context(UnableToListDataSnafu { - container: &self.container_name, - }) { - Ok(resp) => resp, - Err(err) => return Some((Err(crate::Error::from(err)), state)), - }; - - let next_state = if let Some(marker) = &resp.next_marker { - ListState::HasMore(marker.as_str().to_string()) - } else { - ListState::Done - }; + let stream = stream + .into_stream() + .map(|resp| match resp { + Ok(list_blobs) => Ok(list_blobs), + Err(err) => Err(crate::Error::from(Error::UnableToListData { + source: err, + container: self.container_name.clone(), + })), + }) + .boxed(); - Some((Ok(resp), next_state)) - } - }) - .boxed()) + Ok(stream) } } /// Returns `None` if is a directory fn convert_object_meta(blob: Blob) -> Result> { let location = Path::parse(blob.name)?; - let last_modified = blob.properties.last_modified; + let last_modified = Utc.timestamp(blob.properties.last_modified.unix_timestamp(), 0); let size = blob .properties .content_length @@ -580,7 +569,7 @@ fn url_from_env(env_name: &str, default_url: &str) -> Result { Ok(url) } -/// Configure a connection to Mirosoft Azure Blob Storage bucket using +/// Configure a connection to Microsoft Azure Blob Storage container using /// the specified credentials. /// /// # Example @@ -595,14 +584,28 @@ fn url_from_env(env_name: &str, default_url: &str) -> Result { /// .with_container_name(BUCKET_NAME) /// .build(); /// ``` -#[derive(Debug, Default)] +#[derive(Default)] pub struct MicrosoftAzureBuilder { account: Option, access_key: Option, container_name: Option, + bearer_token: Option, + client_id: Option, + client_secret: Option, + tenant_id: Option, use_emulator: bool, } +impl Debug for MicrosoftAzureBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "MicrosoftAzureBuilder {{ account: {:?}, container_name: {:?} }}", + self.account, self.container_name + ) + } +} + impl MicrosoftAzureBuilder { /// Create a new [`MicrosoftAzureBuilder`] with default values. pub fn new() -> Self { @@ -615,18 +618,46 @@ impl MicrosoftAzureBuilder { self } - /// Set the Azure Access Key (required) + /// Set the Azure Access Key (required - one of access key, bearer token, or client credentials) pub fn with_access_key(mut self, access_key: impl Into) -> Self { self.access_key = Some(access_key.into()); self } + /// Set a static bearer token to be used for authorizing requests + /// (required - one of access key, bearer token, or client credentials) + pub fn with_bearer_token(mut self, bearer_token: impl Into) -> Self { + self.bearer_token = Some(bearer_token.into()); + self + } + /// Set the Azure Container Name (required) pub fn with_container_name(mut self, container_name: impl Into) -> Self { self.container_name = Some(container_name.into()); self } + /// Set a client id used for client secret authorization + /// (required - one of access key, bearer token, or client credentials) + pub fn with_client_id(mut self, client_id: impl Into) -> Self { + self.client_id = Some(client_id.into()); + self + } + + /// Set a client secret used for client secret authorization + /// (required - one of access key, bearer token, or client credentials) + pub fn with_client_secret(mut self, client_secret: impl Into) -> Self { + self.client_secret = Some(client_secret.into()); + self + } + + /// Set the tenant id of the Azure AD tenant + /// (required - one of access key, bearer token, or client credentials) + pub fn with_tenant_id(mut self, tenant_id: impl Into) -> Self { + self.tenant_id = Some(tenant_id.into()); + self + } + /// Set if the Azure emulator should be used (defaults to false) pub fn with_use_emulator(mut self, use_emulator: bool) -> Self { self.use_emulator = use_emulator; @@ -640,20 +671,20 @@ impl MicrosoftAzureBuilder { account, access_key, container_name, + bearer_token, + client_id, + client_secret, + tenant_id, use_emulator, } = self; let account = account.ok_or(Error::MissingAccount {})?; - let access_key = access_key.ok_or(Error::MissingAccessKey {})?; let container_name = container_name.ok_or(Error::MissingContainerName {})?; - let http_client: Arc = Arc::new(reqwest::Client::new()); - - let (is_emulator, storage_account_client) = if use_emulator { + let (is_emulator, storage_client) = if use_emulator { check_if_emulator_works()?; // Allow overriding defaults. Values taken from // from https://docs.rs/azure_storage/0.2.0/src/azure_storage/core/clients/storage_account_client.rs.html#129-141 - let http_client = azure_core::new_http_client(); let blob_storage_url = url_from_env("AZURITE_BLOB_STORAGE_URL", "http://127.0.0.1:10000")?; let queue_storage_url = @@ -663,8 +694,7 @@ impl MicrosoftAzureBuilder { let filesystem_url = url_from_env("AZURITE_TABLE_STORAGE_URL", "http://127.0.0.1:10004")?; - let storage_client = StorageAccountClient::new_emulator( - http_client, + let storage_client = StorageClient::new_emulator( &blob_storage_url, &table_storage_url, &queue_storage_url, @@ -673,25 +703,37 @@ impl MicrosoftAzureBuilder { (true, storage_client) } else { - ( - false, - StorageAccountClient::new_access_key( - Arc::clone(&http_client), - &account, - &access_key, - ), - ) + let client = if let Some(bearer_token) = bearer_token { + Ok(StorageClient::new_bearer_token(&account, bearer_token)) + } else if let Some(access_key) = access_key { + Ok(StorageClient::new_access_key(&account, access_key)) + } else if let (Some(client_id), Some(client_secret), Some(tenant_id)) = + (tenant_id, client_id, client_secret) + { + let credential = Arc::new(AutoRefreshingTokenCredential::new(Arc::new( + ClientSecretCredential::new( + tenant_id, + client_id, + client_secret, + TokenCredentialOptions::default(), + ), + ))); + Ok(StorageClient::new_token_credential(&account, credential)) + } else { + Err(Error::MissingCredentials {}) + }?; + + (false, client) }; - let storage_client = storage_account_client.as_storage_client(); - let blob_base_url = storage_account_client + let blob_base_url = storage_client .blob_storage_url() .as_ref() // make url ending consistent between the emulator and remote storage account .trim_end_matches('/') .to_string(); - let container_client = storage_client.as_container_client(&container_name); + let container_client = Arc::new(storage_client.container_client(&container_name)); Ok(MicrosoftAzure { container_client, @@ -735,9 +777,9 @@ impl CloudMultiPartUploadImpl for AzureMultiPartUpload { Box::pin(async move { client - .as_blob_client(location.as_ref()) + .blob_client(location.as_ref()) .put_block(block_id.clone(), buf) - .execute() + .into_future() .await .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; @@ -761,7 +803,7 @@ impl CloudMultiPartUploadImpl for AzureMultiPartUpload { .map(|(part_number, maybe_part)| match maybe_part { Some(part) => { Ok(azure_storage_blobs::blob::BlobBlockType::Uncommitted( - azure_storage_blobs::BlockId::new(part.content_id), + azure_storage_blobs::prelude::BlockId::new(part.content_id), )) } None => Err(io::Error::new( @@ -779,9 +821,9 @@ impl CloudMultiPartUploadImpl for AzureMultiPartUpload { }; client - .as_blob_client(location.as_ref()) - .put_block_list(&block_list) - .execute() + .blob_client(location.as_ref()) + .put_block_list(block_list) + .into_future() .await .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;