diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index cc330807114..33a4daf4e64 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -129,4 +129,4 @@ jobs: OBJECT_STORE_AWS_ENDPOINT: http://localstack:4566 run: | # run tests - cargo test -p object_store --features=aws,azure,azure_test,gcp + cargo test -p object_store --features=aws,azure,gcp diff --git a/.gitignore b/.gitignore index 5810e5addce..f387a7c0ae3 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,7 @@ parquet/data.parquet .githubchangeloggenerator.cache.log justfile .prettierignore -.env \ No newline at end of file +.env +# local azurite file +__azurite* +__blobstorage__ diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 8c713d80b88..966c423a7e7 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -30,38 +30,33 @@ all-features = true [dependencies] # In alphabetical order async-trait = "0.1.53" -# Microsoft Azure Blob storage integration -azure_core = { version = "0.4", optional = true, default-features = false, features = ["enable_reqwest_rustls"] } -azure_identity = { version = "0.5", optional = true, default-features = false, features = ["enable_reqwest_rustls"]} -azure_storage = { version = "0.5", optional = true, default-features = false, features = ["enable_reqwest_rustls"]} -azure_storage_blobs = { version = "0.5", optional = true, default-features = false, features = ["enable_reqwest_rustls"] } bytes = "1.0" chrono = { version = "0.4", default-features = false, features = ["clock"] } -# Google Cloud Storage integration futures = "0.3" -serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } -serde_json = { version = "1.0", default-features = false, optional = true } -quick-xml = { version = "0.23.0", features = ["serialize"], optional = true } -rustls-pemfile = { version = "1.0", default-features = false, optional = true } -ring = { version = "0.16", default-features = false, features = ["std"], optional = true } -base64 = { version = "0.13", default-features = false, optional = true } -rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true } itertools = "0.10.1" +parking_lot = { version = "0.12" } percent-encoding = "2.1" snafu = "0.7" tokio = { version = "1.18", features = ["sync", "macros", "parking_lot", "rt-multi-thread", "time", "io-util"] } tracing = { version = "0.1" } -reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"], optional = true } -parking_lot = { version = "0.12" } -# Filesystem integration url = "2.2" walkdir = "2" +# Cloud storage support +base64 = { version = "0.13", default-features = false, optional = true } +quick-xml = { version = "0.23.0", features = ["serialize"], optional = true } +serde = { version = "1.0", default-features = false, features = ["derive"], optional = true } +serde_json = { version = "1.0", default-features = false, optional = true } +rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true } +reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"], optional = true } +ring = { version = "0.16", default-features = false, features = ["std"], optional = true } +rustls-pemfile = { version = "1.0", default-features = false, optional = true } + [features] -azure = ["azure_core", "azure_storage_blobs", "azure_storage", "reqwest", "azure_identity"] -azure_test = ["azure", "azure_core/azurite_workaround", "azure_storage/azurite_workaround", "azure_storage_blobs/azurite_workaround"] -gcp = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64", "rand", "ring"] -aws = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64", "rand", "ring"] +cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "rustls-pemfile", "base64", "rand", "ring"] +azure = ["cloud"] +gcp = ["cloud"] +aws = ["cloud"] [dev-dependencies] # In alphabetical order dotenv = "0.15.0" diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs index b75005975bd..e6c1bdd74a0 100644 --- a/object_store/src/aws/credential.rs +++ b/object_store/src/aws/credential.rs @@ -17,6 +17,7 @@ use crate::client::retry::RetryExt; use crate::client::token::{TemporaryToken, TokenCache}; +use crate::util::hmac_sha256; use crate::{Result, RetryConfig}; use bytes::Buf; use chrono::{DateTime, Utc}; @@ -188,11 +189,6 @@ impl CredentialExt for RequestBuilder { } } -fn hmac_sha256(secret: impl AsRef<[u8]>, bytes: impl AsRef<[u8]>) -> ring::hmac::Tag { - let key = ring::hmac::Key::new(ring::hmac::HMAC_SHA256, secret.as_ref()); - ring::hmac::sign(&key, bytes.as_ref()) -} - /// Computes the SHA256 digest of `body` returned as a hex encoded string fn hex_digest(bytes: &[u8]) -> String { let digest = ring::digest::digest(&ring::digest::SHA256, bytes); diff --git a/object_store/src/azure.rs b/object_store/src/azure.rs deleted file mode 100644 index a9dbc53e22a..00000000000 --- a/object_store/src/azure.rs +++ /dev/null @@ -1,886 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! An object store implementation for Azure blob storage -//! -//! ## Streaming uploads -//! -//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those -//! blocks. Data is buffered internally to make blocks of at least 5MB and blocks -//! are uploaded concurrently. -//! -//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide -//! a way to drop old blocks. Instead unused blocks are automatically cleaned up -//! after 7 days. -use crate::{ - multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart}, - path::{Path, DELIMITER}, - util::format_prefix, - GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, -}; -use async_trait::async_trait; -use azure_core::{ - error::{Error as AzureError, ErrorKind as AzureErrorKind}, - prelude::*, - StatusCode, -}; -use azure_identity::{ - AutoRefreshingTokenCredential, ClientSecretCredential, TokenCredentialOptions, -}; -use azure_storage::core::clients::StorageClient; -use azure_storage_blobs::blob::Blob; -use azure_storage_blobs::container::operations::ListBlobsResponse; -use azure_storage_blobs::prelude::{ - AsContainerClient, ContainerClient, DeleteSnapshotsMethod, -}; -use bytes::Bytes; -use chrono::{TimeZone, Utc}; -use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use snafu::{ResultExt, Snafu}; -use std::collections::BTreeSet; -use std::fmt::{Debug, Formatter}; -use std::io; -use std::{convert::TryInto, sync::Arc}; -use tokio::io::AsyncWrite; -use url::Url; - -/// A specialized `Error` for Azure object store-related errors -#[derive(Debug, Snafu)] -#[allow(missing_docs)] -enum Error { - #[snafu(display( - "Unable to DELETE data. Container: {}, Location: {}, Error: {} ({:?})", - container, - path, - source, - source, - ))] - UnableToDeleteData { - source: AzureError, - container: String, - path: String, - }, - - #[snafu(display( - "Unable to GET data. Container: {}, Location: {}, Error: {} ({:?})", - container, - path, - source, - source, - ))] - UnableToGetData { - source: AzureError, - container: String, - path: String, - }, - - #[snafu(display( - "Unable to HEAD data. Container: {}, Location: {}, Error: {} ({:?})", - container, - path, - source, - source, - ))] - UnableToHeadData { - source: AzureError, - container: String, - path: String, - }, - - #[snafu(display( - "Unable to GET part of the data. Container: {}, Location: {}, Error: {} ({:?})", - container, - path, - source, - source, - ))] - UnableToGetPieceOfData { - source: AzureError, - container: String, - path: String, - }, - - #[snafu(display( - "Unable to PUT data. Bucket: {}, Location: {}, Error: {} ({:?})", - container, - path, - source, - source, - ))] - UnableToPutData { - source: AzureError, - container: String, - path: String, - }, - - #[snafu(display( - "Unable to list data. Bucket: {}, Error: {} ({:?})", - container, - source, - source, - ))] - UnableToListData { - source: AzureError, - container: String, - }, - - #[snafu(display( - "Unable to copy object. Container: {}, From: {}, To: {}, Error: {}", - container, - from, - to, - source - ))] - UnableToCopyFile { - source: AzureError, - container: String, - from: String, - to: String, - }, - - #[snafu(display( - "Unable parse source url. Container: {}, Error: {}", - container, - source - ))] - UnableToParseUrl { - source: url::ParseError, - container: String, - }, - - NotFound { - path: String, - source: AzureError, - }, - - AlreadyExists { - path: String, - source: AzureError, - }, - - #[cfg(not(feature = "azure_test"))] - #[snafu(display( - "Azurite (azure emulator) support not compiled in, please add `azure_test` feature" - ))] - NoEmulatorFeature, - - #[snafu(display( - "Unable parse emulator url {}={}, Error: {}", - env_name, - env_value, - source - ))] - UnableToParseEmulatorUrl { - env_name: String, - env_value: String, - source: url::ParseError, - }, - - #[snafu(display("Account must be specified"))] - MissingAccount {}, - - #[snafu(display("Container name must be specified"))] - MissingContainerName {}, - - #[snafu(display("At least one authorization option must be specified"))] - MissingCredentials {}, -} - -impl From for super::Error { - fn from(source: Error) -> Self { - match source { - Error::NotFound { path, source } => Self::NotFound { - path, - source: Box::new(source), - }, - Error::AlreadyExists { path, source } => Self::AlreadyExists { - path, - source: Box::new(source), - }, - _ => Self::Generic { - store: "Azure Blob Storage", - source: Box::new(source), - }, - } - } -} - -/// Interface for [Microsoft Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/). -#[derive(Debug)] -pub struct MicrosoftAzure { - container_client: Arc, - container_name: String, - blob_base_url: String, - is_emulator: bool, -} - -impl std::fmt::Display for MicrosoftAzure { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.is_emulator { - true => write!(f, "MicrosoftAzureEmulator({})", self.container_name), - false => write!(f, "MicrosoftAzure({})", self.container_name), - } - } -} - -#[async_trait] -impl ObjectStore for MicrosoftAzure { - async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { - let bytes = bytes::BytesMut::from(&*bytes); - - self.container_client - .blob_client(location.as_ref()) - .put_block_blob(bytes) - .into_future() - .await - .context(UnableToPutDataSnafu { - container: &self.container_name, - path: location.to_owned(), - })?; - - Ok(()) - } - - async fn put_multipart( - &self, - location: &Path, - ) -> Result<(MultipartId, Box)> { - let inner = AzureMultiPartUpload { - container_client: Arc::clone(&self.container_client), - location: location.to_owned(), - }; - Ok((String::new(), Box::new(CloudMultiPartUpload::new(inner, 8)))) - } - - async fn abort_multipart( - &self, - _location: &Path, - _multipart_id: &MultipartId, - ) -> Result<()> { - // There is no way to drop blocks that have been uploaded. Instead, they simply - // expire in 7 days. - Ok(()) - } - - async fn get(&self, location: &Path) -> Result { - let loc = location.clone(); - let mut stream = self - .container_client - .blob_client(location.as_ref()) - .get() - .into_stream() - .and_then(|chunk| chunk.data.collect()) - .map_err(move |err| match err.kind() { - AzureErrorKind::HttpResponse { - status: StatusCode::NotFound, - .. - } => crate::Error::NotFound { - source: Box::new(err), - path: loc.to_string(), - }, - _ => crate::Error::Generic { - source: Box::new(err), - store: "MicrosoftAzure", - }, - }) - .boxed(); - - let first = stream.next().await.transpose()?.unwrap_or_default(); - Ok(GetResult::Stream(Box::pin( - futures::stream::once(async { Ok(first) }).chain(stream), - ))) - } - - async fn get_range( - &self, - location: &Path, - range: std::ops::Range, - ) -> Result { - let map_azure_err = |err: AzureError| match err.kind() { - AzureErrorKind::HttpResponse { - status: StatusCode::NotFound, - .. - } => Error::NotFound { - source: err, - path: location.to_string(), - }, - _ => Error::UnableToGetPieceOfData { - source: err, - container: self.container_name.clone(), - path: location.to_string(), - }, - }; - - let mut stream = self - .container_client - .blob_client(location.as_ref()) - .get() - .range(range) - .into_stream(); - - let mut chunk: Vec = vec![]; - while let Some(value) = stream.next().await { - let value = value - .map_err(map_azure_err)? - .data - .collect() - .await - .map_err(map_azure_err)?; - chunk.extend(&value); - } - - Ok(chunk.into()) - } - - async fn head(&self, location: &Path) -> Result { - let res = self - .container_client - .blob_client(location.as_ref()) - .get_properties() - .into_future() - .await - .map_err(|err| match err.kind() { - AzureErrorKind::HttpResponse { - status: StatusCode::NotFound, - .. - } => Error::NotFound { - source: err, - path: location.to_string(), - }, - _ => Error::UnableToHeadData { - source: err, - container: self.container_name.clone(), - path: location.to_string(), - }, - })?; - - convert_object_meta(res.blob)?.ok_or_else(|| super::Error::NotFound { - path: location.to_string(), - source: "is directory".to_string().into(), - }) - } - - async fn delete(&self, location: &Path) -> Result<()> { - self.container_client - .blob_client(location.as_ref()) - .delete() - .delete_snapshots_method(DeleteSnapshotsMethod::Include) - .into_future() - .await - .context(UnableToDeleteDataSnafu { - container: &self.container_name, - path: location.to_string(), - })?; - - Ok(()) - } - - async fn list( - &self, - prefix: Option<&Path>, - ) -> Result>> { - let stream = self - .list_impl(prefix, false) - .await? - .map_ok(|resp| { - let names = resp - .blobs - .blobs - .into_iter() - .filter_map(|blob| convert_object_meta(blob).transpose()); - futures::stream::iter(names) - }) - .try_flatten() - .boxed(); - - Ok(stream) - } - - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { - let mut stream = self.list_impl(prefix, true).await?; - - let mut common_prefixes = BTreeSet::new(); - let mut objects = Vec::new(); - - while let Some(res) = stream.next().await { - let response = res?; - - let prefixes = response.blobs.blob_prefix.unwrap_or_default(); - for p in prefixes { - common_prefixes.insert(Path::parse(&p.name)?); - } - - let blobs = response.blobs.blobs; - objects.reserve(blobs.len()); - for blob in blobs { - if let Some(meta) = convert_object_meta(blob)? { - objects.push(meta); - } - } - } - - Ok(ListResult { - common_prefixes: common_prefixes.into_iter().collect(), - objects, - }) - } - - async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - let from_url = self.get_copy_from_url(from)?; - self.container_client - .blob_client(to.as_ref()) - .copy(from_url) - .into_future() - .await - .context(UnableToCopyFileSnafu { - container: &self.container_name, - from: from.as_ref(), - to: to.as_ref(), - })?; - Ok(()) - } - - async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - let from_url = self.get_copy_from_url(from)?; - self.container_client - .blob_client(to.as_ref()) - .copy(from_url) - .if_match(IfMatchCondition::NotMatch("*".to_string())) - .into_future() - .await - .map_err(|err| { - if let AzureErrorKind::HttpResponse { - status: StatusCode::Conflict, - .. - } = err.kind() - { - return Error::AlreadyExists { - source: err, - path: to.to_string(), - }; - }; - Error::UnableToCopyFile { - source: err, - container: self.container_name.clone(), - from: from.to_string(), - to: to.to_string(), - } - })?; - Ok(()) - } -} - -impl MicrosoftAzure { - /// helper function to create a source url for copy function - fn get_copy_from_url(&self, from: &Path) -> Result { - let mut url = - Url::parse(&format!("{}/{}", &self.blob_base_url, self.container_name)) - .context(UnableToParseUrlSnafu { - container: &self.container_name, - })?; - - url.path_segments_mut().unwrap().extend(from.parts()); - Ok(url) - } - - async fn list_impl( - &self, - prefix: Option<&Path>, - delimiter: bool, - ) -> Result>> { - let mut stream = self.container_client.list_blobs(); - if let Some(prefix_val) = format_prefix(prefix) { - stream = stream.prefix(prefix_val); - } - if delimiter { - stream = stream.delimiter(Delimiter::new(DELIMITER)); - } - - let stream = stream - .into_stream() - .map(|resp| match resp { - Ok(list_blobs) => Ok(list_blobs), - Err(err) => Err(crate::Error::from(Error::UnableToListData { - source: err, - container: self.container_name.clone(), - })), - }) - .boxed(); - - Ok(stream) - } -} - -/// Returns `None` if is a directory -fn convert_object_meta(blob: Blob) -> Result> { - let location = Path::parse(blob.name)?; - let last_modified = Utc.timestamp(blob.properties.last_modified.unix_timestamp(), 0); - let size = blob - .properties - .content_length - .try_into() - .expect("unsupported size on this platform"); - - // This is needed to filter out gen2 directories - // https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-known-issues#blob-storage-apis - Ok((size > 0).then(|| ObjectMeta { - location, - last_modified, - size, - })) -} - -#[cfg(feature = "azure_test")] -fn check_if_emulator_works() -> Result<()> { - Ok(()) -} - -#[cfg(not(feature = "azure_test"))] -fn check_if_emulator_works() -> Result<()> { - Err(Error::NoEmulatorFeature.into()) -} - -/// Parses the contents of the environment variable `env_name` as a URL -/// if present, otherwise falls back to default_url -fn url_from_env(env_name: &str, default_url: &str) -> Result { - let url = match std::env::var(env_name) { - Ok(env_value) => { - Url::parse(&env_value).context(UnableToParseEmulatorUrlSnafu { - env_name, - env_value, - })? - } - Err(_) => Url::parse(default_url).expect("Failed to parse default URL"), - }; - Ok(url) -} - -/// Configure a connection to Microsoft Azure Blob Storage container using -/// the specified credentials. -/// -/// # Example -/// ``` -/// # let ACCOUNT = "foo"; -/// # let BUCKET_NAME = "foo"; -/// # let ACCESS_KEY = "foo"; -/// # use object_store::azure::MicrosoftAzureBuilder; -/// let azure = MicrosoftAzureBuilder::new() -/// .with_account(ACCOUNT) -/// .with_access_key(ACCESS_KEY) -/// .with_container_name(BUCKET_NAME) -/// .build(); -/// ``` -#[derive(Default)] -pub struct MicrosoftAzureBuilder { - account: Option, - access_key: Option, - container_name: Option, - bearer_token: Option, - client_id: Option, - client_secret: Option, - tenant_id: Option, - use_emulator: bool, -} - -impl Debug for MicrosoftAzureBuilder { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "MicrosoftAzureBuilder {{ account: {:?}, container_name: {:?} }}", - self.account, self.container_name - ) - } -} - -impl MicrosoftAzureBuilder { - /// Create a new [`MicrosoftAzureBuilder`] with default values. - pub fn new() -> Self { - Default::default() - } - - /// Set the Azure Account (required) - pub fn with_account(mut self, account: impl Into) -> Self { - self.account = Some(account.into()); - self - } - - /// Set the Azure Access Key (required - one of access key, bearer token, or client credentials) - pub fn with_access_key(mut self, access_key: impl Into) -> Self { - self.access_key = Some(access_key.into()); - self - } - - /// Set a static bearer token to be used for authorizing requests - /// (required - one of access key, bearer token, or client credentials) - pub fn with_bearer_token(mut self, bearer_token: impl Into) -> Self { - self.bearer_token = Some(bearer_token.into()); - self - } - - /// Set the Azure Container Name (required) - pub fn with_container_name(mut self, container_name: impl Into) -> Self { - self.container_name = Some(container_name.into()); - self - } - - /// Set a client id used for client secret authorization - /// (required - one of access key, bearer token, or client credentials) - pub fn with_client_id(mut self, client_id: impl Into) -> Self { - self.client_id = Some(client_id.into()); - self - } - - /// Set a client secret used for client secret authorization - /// (required - one of access key, bearer token, or client credentials) - pub fn with_client_secret(mut self, client_secret: impl Into) -> Self { - self.client_secret = Some(client_secret.into()); - self - } - - /// Set the tenant id of the Azure AD tenant - /// (required - one of access key, bearer token, or client credentials) - pub fn with_tenant_id(mut self, tenant_id: impl Into) -> Self { - self.tenant_id = Some(tenant_id.into()); - self - } - - /// Set if the Azure emulator should be used (defaults to false) - pub fn with_use_emulator(mut self, use_emulator: bool) -> Self { - self.use_emulator = use_emulator; - self - } - - /// Configure a connection to container with given name on Microsoft Azure - /// Blob store. - pub fn build(self) -> Result { - let Self { - account, - access_key, - container_name, - bearer_token, - client_id, - client_secret, - tenant_id, - use_emulator, - } = self; - - let account = account.ok_or(Error::MissingAccount {})?; - let container_name = container_name.ok_or(Error::MissingContainerName {})?; - - let (is_emulator, storage_client) = if use_emulator { - check_if_emulator_works()?; - // Allow overriding defaults. Values taken from - // from https://docs.rs/azure_storage/0.2.0/src/azure_storage/core/clients/storage_account_client.rs.html#129-141 - let blob_storage_url = - url_from_env("AZURITE_BLOB_STORAGE_URL", "http://127.0.0.1:10000")?; - let queue_storage_url = - url_from_env("AZURITE_QUEUE_STORAGE_URL", "http://127.0.0.1:10001")?; - let table_storage_url = - url_from_env("AZURITE_TABLE_STORAGE_URL", "http://127.0.0.1:10002")?; - let filesystem_url = - url_from_env("AZURITE_TABLE_STORAGE_URL", "http://127.0.0.1:10004")?; - - let storage_client = StorageClient::new_emulator( - &blob_storage_url, - &table_storage_url, - &queue_storage_url, - &filesystem_url, - ); - - (true, storage_client) - } else { - let client = if let Some(bearer_token) = bearer_token { - Ok(StorageClient::new_bearer_token(&account, bearer_token)) - } else if let Some(access_key) = access_key { - Ok(StorageClient::new_access_key(&account, access_key)) - } else if let (Some(client_id), Some(client_secret), Some(tenant_id)) = - (tenant_id, client_id, client_secret) - { - let credential = Arc::new(AutoRefreshingTokenCredential::new(Arc::new( - ClientSecretCredential::new( - tenant_id, - client_id, - client_secret, - TokenCredentialOptions::default(), - ), - ))); - Ok(StorageClient::new_token_credential(&account, credential)) - } else { - Err(Error::MissingCredentials {}) - }?; - - (false, client) - }; - - let blob_base_url = storage_client - .blob_storage_url() - .as_ref() - // make url ending consistent between the emulator and remote storage account - .trim_end_matches('/') - .to_string(); - - let container_client = Arc::new(storage_client.container_client(&container_name)); - - Ok(MicrosoftAzure { - container_client, - container_name, - blob_base_url, - is_emulator, - }) - } -} - -// Relevant docs: https://azure.github.io/Storage/docs/application-and-user-data/basics/azure-blob-storage-upload-apis/ -// In Azure Blob Store, parts are "blocks" -// put_multipart_part -> PUT block -// complete -> PUT block list -// abort -> No equivalent; blocks are simply dropped after 7 days -#[derive(Debug, Clone)] -struct AzureMultiPartUpload { - container_client: Arc, - location: Path, -} - -impl AzureMultiPartUpload { - /// Gets the block id corresponding to the part index. - /// - /// In Azure, the user determines what id each block has. They must be - /// unique within an upload and of consistent length. - fn get_block_id(&self, part_idx: usize) -> String { - format!("{:20}", part_idx) - } -} - -#[async_trait] -impl CloudMultiPartUploadImpl for AzureMultiPartUpload { - async fn put_multipart_part( - &self, - buf: Vec, - part_idx: usize, - ) -> Result { - let block_id = self.get_block_id(part_idx); - - self.container_client - .blob_client(self.location.as_ref()) - .put_block(block_id.clone(), buf) - .into_future() - .await - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - - Ok(UploadPart { - content_id: block_id, - }) - } - - async fn complete(&self, completed_parts: Vec) -> Result<(), io::Error> { - let blocks = completed_parts - .into_iter() - .map(|part| { - azure_storage_blobs::blob::BlobBlockType::Uncommitted( - azure_storage_blobs::prelude::BlockId::new(part.content_id), - ) - }) - .collect(); - - let block_list = azure_storage_blobs::blob::BlockList { blocks }; - - self.container_client - .blob_client(self.location.as_ref()) - .put_block_list(block_list) - .into_future() - .await - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::tests::{ - copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter, - put_get_delete_list, rename_and_copy, - }; - use std::env; - - // Helper macro to skip tests if TEST_INTEGRATION and the Azure environment - // variables are not set. - macro_rules! maybe_skip_integration { - () => {{ - dotenv::dotenv().ok(); - - let use_emulator = std::env::var("AZURE_USE_EMULATOR").is_ok(); - - let mut required_vars = vec!["OBJECT_STORE_BUCKET"]; - if !use_emulator { - required_vars.push("AZURE_STORAGE_ACCOUNT"); - required_vars.push("AZURE_STORAGE_ACCESS_KEY"); - } - let unset_vars: Vec<_> = required_vars - .iter() - .filter_map(|&name| match env::var(name) { - Ok(_) => None, - Err(_) => Some(name), - }) - .collect(); - let unset_var_names = unset_vars.join(", "); - - let force = std::env::var("TEST_INTEGRATION"); - - if force.is_ok() && !unset_var_names.is_empty() { - panic!( - "TEST_INTEGRATION is set, \ - but variable(s) {} need to be set", - unset_var_names - ) - } else if force.is_err() { - eprintln!( - "skipping Azure integration test - set {}TEST_INTEGRATION to run", - if unset_var_names.is_empty() { - String::new() - } else { - format!("{} and ", unset_var_names) - } - ); - return; - } else { - MicrosoftAzureBuilder::new() - .with_account(env::var("AZURE_STORAGE_ACCOUNT").unwrap_or_default()) - .with_access_key( - env::var("AZURE_STORAGE_ACCESS_KEY").unwrap_or_default(), - ) - .with_container_name( - env::var("OBJECT_STORE_BUCKET") - .expect("already checked OBJECT_STORE_BUCKET"), - ) - .with_use_emulator(use_emulator) - } - }}; - } - - #[tokio::test] - async fn azure_blob_test() { - let integration = maybe_skip_integration!().build().unwrap(); - - put_get_delete_list(&integration).await; - list_uses_directories_correctly(&integration).await; - list_with_delimiter(&integration).await; - rename_and_copy(&integration).await; - copy_if_not_exists(&integration).await; - } -} diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs new file mode 100644 index 00000000000..5f37ea95169 --- /dev/null +++ b/object_store/src/azure/client.rs @@ -0,0 +1,743 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::credential::{AzureCredential, CredentialProvider}; +use crate::azure::credential::*; +use crate::client::pagination::stream_paginated; +use crate::client::retry::RetryExt; +use crate::path::DELIMITER; +use crate::util::{format_http_range, format_prefix}; +use crate::{BoxStream, ListResult, ObjectMeta, Path, Result, RetryConfig, StreamExt}; +use bytes::{Buf, Bytes}; +use chrono::{DateTime, TimeZone, Utc}; +use itertools::Itertools; +use reqwest::{ + header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH, RANGE}, + Client as ReqwestClient, Method, Response, StatusCode, +}; +use serde::{Deserialize, Deserializer, Serialize}; +use snafu::{ResultExt, Snafu}; +use std::collections::HashMap; +use std::ops::Range; +use url::Url; + +/// A specialized `Error` for object store-related errors +#[derive(Debug, Snafu)] +#[allow(missing_docs)] +pub(crate) enum Error { + #[snafu(display("Error performing get request {}: {}", path, source))] + GetRequest { + source: reqwest::Error, + path: String, + }, + + #[snafu(display("Error performing put request {}: {}", path, source))] + PutRequest { + source: reqwest::Error, + path: String, + }, + + #[snafu(display("Error performing delete request {}: {}", path, source))] + DeleteRequest { + source: reqwest::Error, + path: String, + }, + + #[snafu(display("Error performing copy request {}: {}", path, source))] + CopyRequest { + source: reqwest::Error, + path: String, + }, + + #[snafu(display("Error performing list request: {}", source))] + ListRequest { source: reqwest::Error }, + + #[snafu(display("Error performing create multipart request: {}", source))] + CreateMultipartRequest { source: reqwest::Error }, + + #[snafu(display("Error performing complete multipart request: {}", source))] + CompleteMultipartRequest { source: reqwest::Error }, + + #[snafu(display("Got invalid list response: {}", source))] + InvalidListResponse { source: quick_xml::de::DeError }, + + #[snafu(display("Got invalid multipart response: {}", source))] + InvalidMultipartResponse { source: quick_xml::de::DeError }, + + #[snafu(display("Error authorizing request: {}", source))] + Authorization { source: crate::client::oauth::Error }, +} + +impl From for crate::Error { + fn from(err: Error) -> Self { + match err { + Error::GetRequest { source, path } + | Error::DeleteRequest { source, path } + | Error::CopyRequest { source, path } + | Error::PutRequest { source, path } + if matches!(source.status(), Some(StatusCode::NOT_FOUND)) => + { + Self::NotFound { + path, + source: Box::new(source), + } + } + Error::CopyRequest { source, path } + if matches!(source.status(), Some(StatusCode::CONFLICT)) => + { + Self::AlreadyExists { + path, + source: Box::new(source), + } + } + _ => Self::Generic { + store: "MicrosoftAzure", + source: Box::new(err), + }, + } + } +} + +/// Configuration for [AzureClient] +#[derive(Debug)] +pub struct AzureConfig { + pub account: String, + pub container: String, + pub credentials: CredentialProvider, + pub retry_config: RetryConfig, + pub allow_http: bool, + pub service: Url, + pub is_emulator: bool, +} + +impl AzureConfig { + fn path_url(&self, path: &Path) -> Url { + let mut url = self.service.clone(); + { + let mut path_mut = url.path_segments_mut().unwrap(); + if self.is_emulator { + path_mut.push(&self.account); + } + path_mut.push(&self.container).extend(path.parts()); + } + url + } +} + +#[derive(Debug)] +pub(crate) struct AzureClient { + config: AzureConfig, + client: ReqwestClient, +} + +impl AzureClient { + /// create a new instance of [AzureClient] + pub fn new(config: AzureConfig) -> Self { + let client = reqwest::ClientBuilder::new() + .https_only(!config.allow_http) + .build() + .unwrap(); + + Self { config, client } + } + + /// Returns the config + pub fn config(&self) -> &AzureConfig { + &self.config + } + + async fn get_credential(&self) -> Result { + match &self.config.credentials { + CredentialProvider::AccessKey(key) => { + Ok(AzureCredential::AccessKey(key.to_owned())) + } + CredentialProvider::ClientSecret(cred) => { + let token = cred + .fetch_token(&self.client, &self.config.retry_config) + .await + .context(AuthorizationSnafu)?; + Ok(AzureCredential::AuthorizationToken( + // we do the conversion to a HeaderValue here, since it is fallible + // and we wna to use it in an infallible function + HeaderValue::from_str(&format!("Bearer {}", token)).map_err( + |err| crate::Error::Generic { + store: "MicrosoftAzure", + source: Box::new(err), + }, + )?, + )) + } + CredentialProvider::SASToken(sas) => { + Ok(AzureCredential::SASToken(sas.clone())) + } + } + } + + /// Make an Azure PUT request + pub async fn put_request( + &self, + path: &Path, + bytes: Option, + is_block_op: bool, + query: &T, + ) -> Result { + let credential = self.get_credential().await?; + let url = self.config.path_url(path); + + let mut builder = self.client.request(Method::PUT, url); + + if !is_block_op { + builder = builder.header(&BLOB_TYPE, "BlockBlob").query(query); + } else { + builder = builder.query(query); + } + + if let Some(bytes) = bytes { + builder = builder + .header(CONTENT_LENGTH, HeaderValue::from(bytes.len())) + .body(bytes) + } else { + builder = builder.header(CONTENT_LENGTH, HeaderValue::from_static("0")); + } + + let response = builder + .with_azure_authorization(&credential, &self.config.account) + .send_retry(&self.config.retry_config) + .await + .context(PutRequestSnafu { + path: path.as_ref(), + })? + .error_for_status() + .context(PutRequestSnafu { + path: path.as_ref(), + })?; + + Ok(response) + } + + /// Make an Azure GET request + /// + /// + pub async fn get_request( + &self, + path: &Path, + range: Option>, + head: bool, + ) -> Result { + let credential = self.get_credential().await?; + let url = self.config.path_url(path); + let method = match head { + true => Method::HEAD, + false => Method::GET, + }; + + let mut builder = self + .client + .request(method, url) + .header(CONTENT_LENGTH, HeaderValue::from_static("0")) + .body(Bytes::new()); + + if let Some(range) = range { + builder = builder.header(RANGE, format_http_range(range)); + } + + let response = builder + .with_azure_authorization(&credential, &self.config.account) + .send_retry(&self.config.retry_config) + .await + .context(GetRequestSnafu { + path: path.as_ref(), + })? + .error_for_status() + .context(GetRequestSnafu { + path: path.as_ref(), + })?; + + Ok(response) + } + + /// Make an Azure Delete request + pub async fn delete_request( + &self, + path: &Path, + query: &T, + ) -> Result<()> { + let credential = self.get_credential().await?; + let url = self.config.path_url(path); + + self.client + .request(Method::DELETE, url) + .query(query) + .header(&DELETE_SNAPSHOTS, "include") + .with_azure_authorization(&credential, &self.config.account) + .send_retry(&self.config.retry_config) + .await + .context(DeleteRequestSnafu { + path: path.as_ref(), + })? + .error_for_status() + .context(DeleteRequestSnafu { + path: path.as_ref(), + })?; + + Ok(()) + } + + /// Make an Azure Copy request + pub async fn copy_request( + &self, + from: &Path, + to: &Path, + overwrite: bool, + ) -> Result<()> { + let credential = self.get_credential().await?; + let url = self.config.path_url(to); + let mut source = self.config.path_url(from); + + // If using SAS authorization must include the headers in the URL + // + if let AzureCredential::SASToken(pairs) = &credential { + source.query_pairs_mut().extend_pairs(pairs); + } + + let mut builder = self + .client + .request(Method::PUT, url) + .header(©_SOURCE, source.to_string()) + .header(CONTENT_LENGTH, HeaderValue::from_static("0")); + + if !overwrite { + builder = builder.header(IF_NONE_MATCH, "*"); + } + + builder + .with_azure_authorization(&credential, &self.config.account) + .send_retry(&self.config.retry_config) + .await + .context(CopyRequestSnafu { + path: from.as_ref(), + })? + .error_for_status() + .context(CopyRequestSnafu { + path: from.as_ref(), + })?; + + Ok(()) + } + + /// Make an Azure List request + async fn list_request( + &self, + prefix: Option<&str>, + delimiter: bool, + token: Option<&str>, + ) -> Result<(ListResult, Option)> { + let credential = self.get_credential().await?; + let url = self.config.path_url(&Path::default()); + + let mut query = Vec::with_capacity(5); + query.push(("restype", "container")); + query.push(("comp", "list")); + + if let Some(prefix) = prefix { + query.push(("prefix", prefix)) + } + + if delimiter { + query.push(("delimiter", DELIMITER)) + } + + if let Some(token) = token { + query.push(("marker", token)) + } + + let response = self + .client + .request(Method::GET, url) + .query(&query) + .with_azure_authorization(&credential, &self.config.account) + .send_retry(&self.config.retry_config) + .await + .context(ListRequestSnafu)? + .error_for_status() + .context(ListRequestSnafu)? + .bytes() + .await + .context(ListRequestSnafu)?; + + let mut response: ListResultInternal = + quick_xml::de::from_reader(response.reader()) + .context(InvalidListResponseSnafu)?; + let token = response.next_marker.take(); + + Ok((response.try_into()?, token)) + } + + /// Perform a list operation automatically handling pagination + pub fn list_paginated( + &self, + prefix: Option<&Path>, + delimiter: bool, + ) -> BoxStream<'_, Result> { + let prefix = format_prefix(prefix); + stream_paginated(prefix, move |prefix, token| async move { + let (r, next_token) = self + .list_request(prefix.as_deref(), delimiter, token.as_deref()) + .await?; + Ok((r, prefix, next_token)) + }) + .boxed() + } +} + +/// Raw / internal response from list requests +#[derive(Debug, Clone, PartialEq, Deserialize)] +#[serde(rename_all = "PascalCase")] +struct ListResultInternal { + pub prefix: Option, + pub max_results: Option, + pub delimiter: Option, + pub next_marker: Option, + pub blobs: Blobs, +} + +impl TryFrom for ListResult { + type Error = crate::Error; + + fn try_from(value: ListResultInternal) -> Result { + let common_prefixes = value + .blobs + .blob_prefix + .unwrap_or_default() + .into_iter() + .map(|x| Ok(Path::parse(&x.name)?)) + .collect::>()?; + + let objects = value + .blobs + .blobs + .into_iter() + .map(ObjectMeta::try_from) + // Note: workaround for gen2 accounts with hierarchical namespaces. These accounts also + // return path segments as "directories". When we cant directories, its always via + // the BlobPrefix mechanics. + .filter_map_ok(|obj| if obj.size > 0 { Some(obj) } else { None }) + .collect::>()?; + + Ok(Self { + common_prefixes, + objects, + }) + } +} + +/// Collection of blobs and potentially shared prefixes returned from list requests. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "PascalCase")] +struct Blobs { + pub blob_prefix: Option>, + #[serde(rename = "Blob", default)] + pub blobs: Vec, +} + +/// Common prefix in list blobs response +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "PascalCase")] +struct BlobPrefix { + pub name: String, +} + +/// Details for a specific blob +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "PascalCase")] +struct Blob { + pub name: String, + pub version_id: Option, + pub is_current_version: Option, + pub deleted: Option, + pub properties: BlobProperties, + pub metadata: Option>, +} + +impl TryFrom for ObjectMeta { + type Error = crate::Error; + + fn try_from(value: Blob) -> Result { + Ok(Self { + location: Path::parse(value.name)?, + last_modified: value.properties.last_modified, + size: value.properties.content_length as usize, + }) + } +} + +/// Properties associated with individual blobs. The actual list +/// of returned properties is much more exhaustive, but we limit +/// the parsed fields to the ones relevant in this crate. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "PascalCase")] +struct BlobProperties { + #[serde(deserialize_with = "deserialize_http_date", rename = "Last-Modified")] + pub last_modified: DateTime, + pub etag: String, + #[serde(rename = "Content-Length")] + pub content_length: u64, + #[serde(rename = "Content-Type")] + pub content_type: String, + #[serde(rename = "Content-Encoding")] + pub content_encoding: Option, + #[serde(rename = "Content-Language")] + pub content_language: Option, +} + +// deserialize dates used in Azure payloads according to rfc1123 +fn deserialize_http_date<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let s = String::deserialize(deserializer)?; + Utc.datetime_from_str(&s, RFC1123_FMT) + .map_err(serde::de::Error::custom) +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct BlockId(Bytes); + +impl BlockId { + pub fn new(block_id: impl Into) -> Self { + Self(block_id.into()) + } +} + +impl From for BlockId +where + B: Into, +{ + fn from(v: B) -> Self { + Self::new(v) + } +} + +impl AsRef<[u8]> for BlockId { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub(crate) struct BlockList { + pub blocks: Vec, +} + +impl BlockList { + pub fn to_xml(&self) -> String { + let mut s = String::new(); + s.push_str("\n\n"); + for block_id in &self.blocks { + let node = format!( + "\t{}\n", + base64::encode(block_id) + ); + s.push_str(&node); + } + + s.push_str(""); + s + } +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + + use super::*; + + #[test] + fn deserde_azure() { + const S: &str = " + + + + blob0.txt + + Thu, 01 Jul 2021 10:44:59 GMT + Thu, 01 Jul 2021 10:44:59 GMT + Thu, 07 Jul 2022 14:38:48 GMT + 0x8D93C7D4629C227 + 8 + text/plain + + + + rvr3UC1SmUw7AZV2NqPN0g== + + + BlockBlob + Hot + true + unlocked + available + true + + uservalue + + + + blob1.txt + + Thu, 01 Jul 2021 10:44:59 GMT + Thu, 01 Jul 2021 10:44:59 GMT + 0x8D93C7D463004D6 + 8 + text/plain + + + + rvr3UC1SmUw7AZV2NqPN0g== + + + BlockBlob + Hot + true + unlocked + available + true + + + + + blob2.txt + + Thu, 01 Jul 2021 10:44:59 GMT + Thu, 01 Jul 2021 10:44:59 GMT + 0x8D93C7D4636478A + 8 + text/plain + + + + rvr3UC1SmUw7AZV2NqPN0g== + + + BlockBlob + Hot + true + unlocked + available + true + + + + + +"; + + let bytes = Bytes::from(S); + let mut _list_blobs_response_internal: ListResultInternal = + quick_xml::de::from_slice(bytes.as_ref()).unwrap(); + } + + #[test] + fn deserde_azurite() { + const S: &str = " + + + + 5000 + + + + blob0.txt + + Thu, 01 Jul 2021 10:45:02 GMT + Thu, 01 Jul 2021 10:45:02 GMT + 0x228281B5D517B20 + 8 + text/plain + rvr3UC1SmUw7AZV2NqPN0g== + BlockBlob + unlocked + available + true + Hot + true + Thu, 01 Jul 2021 10:45:02 GMT + + + + blob1.txt + + Thu, 01 Jul 2021 10:45:02 GMT + Thu, 01 Jul 2021 10:45:02 GMT + 0x1DD959381A8A860 + 8 + text/plain + rvr3UC1SmUw7AZV2NqPN0g== + BlockBlob + unlocked + available + true + Hot + true + Thu, 01 Jul 2021 10:45:02 GMT + + + + blob2.txt + + Thu, 01 Jul 2021 10:45:02 GMT + Thu, 01 Jul 2021 10:45:02 GMT + 0x1FBE9C9B0C7B650 + 8 + text/plain + rvr3UC1SmUw7AZV2NqPN0g== + BlockBlob + unlocked + available + true + Hot + true + Thu, 01 Jul 2021 10:45:02 GMT + + + + +"; + + let bytes = Bytes::from(S); + let mut _list_blobs_response_internal: ListResultInternal = + quick_xml::de::from_slice(bytes.as_ref()).unwrap(); + } + + #[test] + fn to_xml() { + const S: &str = " + +\tbnVtZXJvMQ== +\tbnVtZXJvMg== +\tbnVtZXJvMw== +"; + let mut blocks = BlockList { blocks: Vec::new() }; + blocks.blocks.push(Bytes::from_static(b"numero1").into()); + blocks.blocks.push("numero2".into()); + blocks.blocks.push("numero3".into()); + + let res: &str = &blocks.to_xml(); + + assert_eq!(res, S) + } +} diff --git a/object_store/src/azure/credential.rs b/object_store/src/azure/credential.rs new file mode 100644 index 00000000000..9357e80892c --- /dev/null +++ b/object_store/src/azure/credential.rs @@ -0,0 +1,255 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::client::oauth::ClientSecretOAuthProvider; +use crate::util::hmac_sha256; +use chrono::Utc; +use reqwest::{ + header::{ + HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_ENCODING, + CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH, + IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_UNMODIFIED_SINCE, RANGE, + }, + Method, RequestBuilder, +}; +use std::borrow::Cow; +use std::str; +use url::Url; + +static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2021-08-06"); +static VERSION: HeaderName = HeaderName::from_static("x-ms-version"); +pub(crate) static BLOB_TYPE: HeaderName = HeaderName::from_static("x-ms-blob-type"); +pub(crate) static DELETE_SNAPSHOTS: HeaderName = + HeaderName::from_static("x-ms-delete-snapshots"); +pub(crate) static COPY_SOURCE: HeaderName = HeaderName::from_static("x-ms-copy-source"); +static CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5"); +pub(crate) static RFC1123_FMT: &str = "%a, %d %h %Y %T GMT"; + +/// Provides credentials for use when signing requests +#[derive(Debug)] +pub enum CredentialProvider { + AccessKey(String), + SASToken(Vec<(String, String)>), + ClientSecret(ClientSecretOAuthProvider), +} + +pub(crate) enum AzureCredential { + AccessKey(String), + SASToken(Vec<(String, String)>), + AuthorizationToken(HeaderValue), +} + +/// A list of known Azure authority hosts +pub mod authority_hosts { + /// China-based Azure Authority Host + pub const AZURE_CHINA: &str = "https://login.chinacloudapi.cn"; + /// Germany-based Azure Authority Host + pub const AZURE_GERMANY: &str = "https://login.microsoftonline.de"; + /// US Government Azure Authority Host + pub const AZURE_GOVERNMENT: &str = "https://login.microsoftonline.us"; + /// Public Cloud Azure Authority Host + pub const AZURE_PUBLIC_CLOUD: &str = "https://login.microsoftonline.com"; +} + +pub(crate) trait CredentialExt { + /// Apply authorization to requests against azure storage accounts + /// + fn with_azure_authorization( + self, + credential: &AzureCredential, + account: &str, + ) -> Self; +} + +impl CredentialExt for RequestBuilder { + fn with_azure_authorization( + mut self, + credential: &AzureCredential, + account: &str, + ) -> Self { + // rfc2822 string should never contain illegal characters + let date = Utc::now(); + let date_str = date.format(RFC1123_FMT).to_string(); + // we formatted the data string ourselves, so unwrapping should be fine + let date_val = HeaderValue::from_str(&date_str).unwrap(); + self = self + .header(DATE, &date_val) + .header(&VERSION, &AZURE_VERSION); + + // Hack around lack of access to underlying request + // https://github.com/seanmonstar/reqwest/issues/1212 + let request = self + .try_clone() + .expect("not stream") + .build() + .expect("request valid"); + + match credential { + AzureCredential::AccessKey(key) => { + let signature = generate_authorization( + request.headers(), + request.url(), + request.method(), + account, + key.as_str(), + ); + self = self + // "signature" is a base 64 encoded string so it should never contain illegal characters. + .header( + AUTHORIZATION, + HeaderValue::from_str(signature.as_str()).unwrap(), + ); + } + AzureCredential::AuthorizationToken(token) => { + self = self.header(AUTHORIZATION, token); + } + AzureCredential::SASToken(query_pairs) => { + self = self.query(&query_pairs); + } + }; + + self + } +} + +/// Generate signed key for authorization via access keys +/// +fn generate_authorization( + h: &HeaderMap, + u: &Url, + method: &Method, + account: &str, + key: &str, +) -> String { + let str_to_sign = string_to_sign(h, u, method, account); + let auth = hmac_sha256(base64::decode(key).unwrap(), &str_to_sign); + format!("SharedKey {}:{}", account, base64::encode(auth)) +} + +fn add_if_exists<'a>(h: &'a HeaderMap, key: &HeaderName) -> &'a str { + h.get(key) + .map(|s| s.to_str()) + .transpose() + .ok() + .flatten() + .unwrap_or_default() +} + +/// +fn string_to_sign(h: &HeaderMap, u: &Url, method: &Method, account: &str) -> String { + // content length must only be specified if != 0 + // this is valid from 2015-02-21 + let content_length = h + .get(&CONTENT_LENGTH) + .map(|s| s.to_str()) + .transpose() + .ok() + .flatten() + .filter(|&v| v != "0") + .unwrap_or_default(); + format!( + "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}{}", + method.as_ref(), + add_if_exists(h, &CONTENT_ENCODING), + add_if_exists(h, &CONTENT_LANGUAGE), + content_length, + add_if_exists(h, &CONTENT_MD5), + add_if_exists(h, &CONTENT_TYPE), + add_if_exists(h, &DATE), + add_if_exists(h, &IF_MODIFIED_SINCE), + add_if_exists(h, &IF_MATCH), + add_if_exists(h, &IF_NONE_MATCH), + add_if_exists(h, &IF_UNMODIFIED_SINCE), + add_if_exists(h, &RANGE), + canonicalize_header(h), + canonicalized_resource(account, u) + ) +} + +/// +fn canonicalize_header(headers: &HeaderMap) -> String { + let mut names = headers + .iter() + .filter_map(|(k, _)| { + (k.as_str().starts_with("x-ms")) + // TODO remove unwraps + .then(|| (k.as_str(), headers.get(k).unwrap().to_str().unwrap())) + }) + .collect::>(); + names.sort_unstable(); + + let mut result = String::new(); + for (name, value) in names { + result.push_str(name); + result.push(':'); + result.push_str(value); + result.push('\n'); + } + result +} + +/// +fn canonicalized_resource(account: &str, uri: &Url) -> String { + let mut can_res: String = String::new(); + can_res.push('/'); + can_res.push_str(account); + can_res.push_str(uri.path().to_string().as_str()); + can_res.push('\n'); + + // query parameters + let query_pairs = uri.query_pairs(); + { + let mut qps: Vec = Vec::new(); + for (q, _) in query_pairs { + if !(qps.iter().any(|x| x == &*q)) { + qps.push(q.into_owned()); + } + } + + qps.sort(); + + for qparam in qps { + // find correct parameter + let ret = lexy_sort(query_pairs, &qparam); + + can_res = can_res + &qparam.to_lowercase() + ":"; + + for (i, item) in ret.iter().enumerate() { + if i > 0 { + can_res.push(','); + } + can_res.push_str(item); + } + + can_res.push('\n'); + } + }; + + can_res[0..can_res.len() - 1].to_owned() +} + +fn lexy_sort<'a>( + vec: impl Iterator, Cow<'a, str>)> + 'a, + query_param: &str, +) -> Vec> { + let mut values = vec + .filter(|(k, _)| *k == query_param) + .map(|(_, v)| v) + .collect::>(); + values.sort_unstable(); + values +} diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs new file mode 100644 index 00000000000..53e7ed60623 --- /dev/null +++ b/object_store/src/azure/mod.rs @@ -0,0 +1,705 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! An object store implementation for Azure blob storage +//! +//! ## Streaming uploads +//! +//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those +//! blocks. Data is buffered internally to make blocks of at least 5MB and blocks +//! are uploaded concurrently. +//! +//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide +//! a way to drop old blocks. Instead unused blocks are automatically cleaned up +//! after 7 days. +use self::client::{BlockId, BlockList}; +use crate::{ + multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart}, + path::Path, + GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, RetryConfig, +}; +use async_trait::async_trait; +use bytes::Bytes; +use chrono::{TimeZone, Utc}; +use futures::{stream::BoxStream, StreamExt, TryStreamExt}; +use snafu::{ResultExt, Snafu}; +use std::collections::BTreeSet; +use std::fmt::{Debug, Formatter}; +use std::io; +use std::ops::Range; +use std::sync::Arc; +use tokio::io::AsyncWrite; +use url::Url; + +pub use credential::authority_hosts; + +mod client; +mod credential; + +/// The well-known account used by Azurite and the legacy Azure Storage Emulator. +/// +const EMULATOR_ACCOUNT: &str = "devstoreaccount1"; + +/// The well-known account key used by Azurite and the legacy Azure Storage Emulator. +/// +const EMULATOR_ACCOUNT_KEY: &str = + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="; + +/// A specialized `Error` for Azure object store-related errors +#[derive(Debug, Snafu)] +#[allow(missing_docs)] +enum Error { + #[snafu(display("Last-Modified Header missing from response"))] + MissingLastModified, + + #[snafu(display("Content-Length Header missing from response"))] + MissingContentLength, + + #[snafu(display("Invalid last modified '{}': {}", last_modified, source))] + InvalidLastModified { + last_modified: String, + source: chrono::ParseError, + }, + + #[snafu(display("Invalid content length '{}': {}", content_length, source))] + InvalidContentLength { + content_length: String, + source: std::num::ParseIntError, + }, + + #[snafu(display("Received header containing non-ASCII data"))] + BadHeader { source: reqwest::header::ToStrError }, + + #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, source))] + UnableToParseUrl { + source: url::ParseError, + url: String, + }, + + #[snafu(display( + "Unable parse emulator url {}={}, Error: {}", + env_name, + env_value, + source + ))] + UnableToParseEmulatorUrl { + env_name: String, + env_value: String, + source: url::ParseError, + }, + + #[snafu(display("Account must be specified"))] + MissingAccount {}, + + #[snafu(display("Container name must be specified"))] + MissingContainerName {}, + + #[snafu(display("At least one authorization option must be specified"))] + MissingCredentials {}, +} + +impl From for super::Error { + fn from(source: Error) -> Self { + Self::Generic { + store: "MicrosoftAzure", + source: Box::new(source), + } + } +} + +/// Interface for [Microsoft Azure Blob Storage](https://azure.microsoft.com/en-us/services/storage/blobs/). +#[derive(Debug)] +pub struct MicrosoftAzure { + client: Arc, +} + +impl std::fmt::Display for MicrosoftAzure { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "MicrosoftAzure {{ account: {}, container: {} }}", + self.client.config().account, + self.client.config().container + ) + } +} + +#[async_trait] +impl ObjectStore for MicrosoftAzure { + async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + self.client + .put_request(location, Some(bytes), false, &()) + .await?; + Ok(()) + } + + async fn put_multipart( + &self, + location: &Path, + ) -> Result<(MultipartId, Box)> { + let inner = AzureMultiPartUpload { + client: Arc::clone(&self.client), + location: location.to_owned(), + }; + Ok((String::new(), Box::new(CloudMultiPartUpload::new(inner, 8)))) + } + + async fn abort_multipart( + &self, + _location: &Path, + _multipart_id: &MultipartId, + ) -> Result<()> { + // There is no way to drop blocks that have been uploaded. Instead, they simply + // expire in 7 days. + Ok(()) + } + + async fn get(&self, location: &Path) -> Result { + let response = self.client.get_request(location, None, false).await?; + let stream = response + .bytes_stream() + .map_err(|source| crate::Error::Generic { + store: "MicrosoftAzure", + source: Box::new(source), + }) + .boxed(); + + Ok(GetResult::Stream(stream)) + } + + async fn get_range(&self, location: &Path, range: Range) -> Result { + let bytes = self + .client + .get_request(location, Some(range), false) + .await? + .bytes() + .await + .map_err(|source| client::Error::GetRequest { + source, + path: location.to_string(), + })?; + Ok(bytes) + } + + async fn head(&self, location: &Path) -> Result { + use reqwest::header::{CONTENT_LENGTH, LAST_MODIFIED}; + + // Extract meta from headers + // https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties + let response = self.client.get_request(location, None, true).await?; + let headers = response.headers(); + + let last_modified = headers + .get(LAST_MODIFIED) + .ok_or(Error::MissingLastModified)? + .to_str() + .context(BadHeaderSnafu)?; + let last_modified = Utc + .datetime_from_str(last_modified, credential::RFC1123_FMT) + .context(InvalidLastModifiedSnafu { last_modified })?; + + let content_length = headers + .get(CONTENT_LENGTH) + .ok_or(Error::MissingContentLength)? + .to_str() + .context(BadHeaderSnafu)?; + let content_length = content_length + .parse() + .context(InvalidContentLengthSnafu { content_length })?; + + Ok(ObjectMeta { + location: location.clone(), + last_modified, + size: content_length, + }) + } + + async fn delete(&self, location: &Path) -> Result<()> { + self.client.delete_request(location, &()).await + } + + async fn list( + &self, + prefix: Option<&Path>, + ) -> Result>> { + let stream = self + .client + .list_paginated(prefix, false) + .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok))) + .try_flatten() + .boxed(); + + Ok(stream) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + let mut stream = self.client.list_paginated(prefix, true); + + let mut common_prefixes = BTreeSet::new(); + let mut objects = Vec::new(); + + while let Some(result) = stream.next().await { + let response = result?; + common_prefixes.extend(response.common_prefixes.into_iter()); + objects.extend(response.objects.into_iter()); + } + + Ok(ListResult { + common_prefixes: common_prefixes.into_iter().collect(), + objects, + }) + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + self.client.copy_request(from, to, true).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + self.client.copy_request(from, to, false).await + } +} + +/// Relevant docs: +/// In Azure Blob Store, parts are "blocks" +/// put_multipart_part -> PUT block +/// complete -> PUT block list +/// abort -> No equivalent; blocks are simply dropped after 7 days +#[derive(Debug, Clone)] +struct AzureMultiPartUpload { + client: Arc, + location: Path, +} + +#[async_trait] +impl CloudMultiPartUploadImpl for AzureMultiPartUpload { + async fn put_multipart_part( + &self, + buf: Vec, + part_idx: usize, + ) -> Result { + let content_id = format!("{:20}", part_idx); + let block_id: BlockId = content_id.clone().into(); + + self.client + .put_request( + &self.location, + Some(buf.into()), + true, + &[("comp", "block"), ("blockid", &base64::encode(block_id))], + ) + .await?; + + Ok(UploadPart { content_id }) + } + + async fn complete(&self, completed_parts: Vec) -> Result<(), io::Error> { + let blocks = completed_parts + .into_iter() + .map(|part| BlockId::from(part.content_id)) + .collect(); + + let block_list = BlockList { blocks }; + let block_xml = block_list.to_xml(); + + self.client + .put_request( + &self.location, + Some(block_xml.into()), + true, + &[("comp", "blocklist")], + ) + .await?; + + Ok(()) + } +} + +/// Configure a connection to Microsoft Azure Blob Storage container using +/// the specified credentials. +/// +/// # Example +/// ``` +/// # let ACCOUNT = "foo"; +/// # let BUCKET_NAME = "foo"; +/// # let ACCESS_KEY = "foo"; +/// # use object_store::azure::MicrosoftAzureBuilder; +/// let azure = MicrosoftAzureBuilder::new() +/// .with_account(ACCOUNT) +/// .with_access_key(ACCESS_KEY) +/// .with_container_name(BUCKET_NAME) +/// .build(); +/// ``` +#[derive(Default)] +pub struct MicrosoftAzureBuilder { + account_name: Option, + access_key: Option, + container_name: Option, + bearer_token: Option, + client_id: Option, + client_secret: Option, + tenant_id: Option, + sas_query_pairs: Option>, + authority_host: Option, + use_emulator: bool, + retry_config: RetryConfig, + allow_http: bool, +} + +impl Debug for MicrosoftAzureBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "MicrosoftAzureBuilder {{ account: {:?}, container_name: {:?} }}", + self.account_name, self.container_name + ) + } +} + +impl MicrosoftAzureBuilder { + /// Create a new [`MicrosoftAzureBuilder`] with default values. + pub fn new() -> Self { + Default::default() + } + + /// Create an instance of [MicrosoftAzureBuilder] with values pre-populated from environment variables. + /// + /// Variables extracted from environment: + /// * AZURE_STORAGE_ACCOUNT_NAME: storage account name + /// * AZURE_STORAGE_ACCOUNT_KEY: storage account master key + /// * AZURE_STORAGE_ACCESS_KEY: alias for AZURE_STORAGE_ACCOUNT_KEY + /// * AZURE_STORAGE_CLIENT_ID -> client id for service principal authorization + /// * AZURE_STORAGE_CLIENT_SECRET -> client secret for service principal authorization + /// * AZURE_STORAGE_TENANT_ID -> tenant id used in oauth flows + /// # Example + /// ``` + /// use object_store::azure::MicrosoftAzureBuilder; + /// + /// let azure = MicrosoftAzureBuilder::from_env() + /// .with_container_name("foo") + /// .build(); + /// ``` + pub fn from_env() -> Self { + let mut builder = Self::default(); + + if let Ok(account_name) = std::env::var("AZURE_STORAGE_ACCOUNT_NAME") { + builder.account_name = Some(account_name); + } + + if let Ok(access_key) = std::env::var("AZURE_STORAGE_ACCOUNT_KEY") { + builder.access_key = Some(access_key); + } else if let Ok(access_key) = std::env::var("AZURE_STORAGE_ACCESS_KEY") { + builder.access_key = Some(access_key); + } + + if let Ok(client_id) = std::env::var("AZURE_STORAGE_CLIENT_ID") { + builder.client_id = Some(client_id); + } + + if let Ok(client_secret) = std::env::var("AZURE_STORAGE_CLIENT_SECRET") { + builder.client_secret = Some(client_secret); + } + + if let Ok(tenant_id) = std::env::var("AZURE_STORAGE_TENANT_ID") { + builder.tenant_id = Some(tenant_id); + } + + builder + } + + /// Set the Azure Account (required) + pub fn with_account(mut self, account: impl Into) -> Self { + self.account_name = Some(account.into()); + self + } + + /// Set the Azure Container Name (required) + pub fn with_container_name(mut self, container_name: impl Into) -> Self { + self.container_name = Some(container_name.into()); + self + } + + /// Set the Azure Access Key (required - one of access key, bearer token, or client credentials) + pub fn with_access_key(mut self, access_key: impl Into) -> Self { + self.access_key = Some(access_key.into()); + self + } + + /// Set a static bearer token to be used for authorizing requests + pub fn with_bearer_token_authorization( + mut self, + bearer_token: impl Into, + ) -> Self { + self.bearer_token = Some(bearer_token.into()); + self + } + + /// Set a client secret used for client secret authorization + pub fn with_client_secret_authorization( + mut self, + client_id: impl Into, + client_secret: impl Into, + tenant_id: impl Into, + ) -> Self { + self.client_id = Some(client_id.into()); + self.client_secret = Some(client_secret.into()); + self.tenant_id = Some(tenant_id.into()); + self + } + + /// Set query pairs appended to the url for shared access signature authorization + pub fn with_sas_authorization( + mut self, + query_pairs: impl Into>, + ) -> Self { + self.sas_query_pairs = Some(query_pairs.into()); + self + } + + /// Set if the Azure emulator should be used (defaults to false) + pub fn with_use_emulator(mut self, use_emulator: bool) -> Self { + self.use_emulator = use_emulator; + self + } + + /// Sets what protocol is allowed. If `allow_http` is : + /// * false (default): Only HTTPS is allowed + /// * true: HTTP and HTTPS are allowed + pub fn with_allow_http(mut self, allow_http: bool) -> Self { + self.allow_http = allow_http; + self + } + + /// Sets an alternative authority host for OAuth based authorization + /// common hosts for azure clouds are defined in [authority_hosts]. + /// Defaults to + pub fn with_authority_host(mut self, authority_host: String) -> Self { + self.authority_host = Some(authority_host); + self + } + + /// Set the retry configuration + pub fn with_retry(mut self, retry_config: RetryConfig) -> Self { + self.retry_config = retry_config; + self + } + + /// Configure a connection to container with given name on Microsoft Azure + /// Blob store. + pub fn build(self) -> Result { + let Self { + account_name, + access_key, + container_name, + bearer_token, + client_id, + client_secret, + tenant_id, + sas_query_pairs, + use_emulator, + retry_config, + allow_http, + authority_host, + } = self; + + let container = container_name.ok_or(Error::MissingContainerName {})?; + + let (is_emulator, allow_http, storage_url, auth, account) = if use_emulator { + let account_name = + account_name.unwrap_or_else(|| EMULATOR_ACCOUNT.to_string()); + // Allow overriding defaults. Values taken from + // from https://docs.rs/azure_storage/0.2.0/src/azure_storage/core/clients/storage_account_client.rs.html#129-141 + let url = url_from_env("AZURITE_BLOB_STORAGE_URL", "http://127.0.0.1:10000")?; + let account_key = + access_key.unwrap_or_else(|| EMULATOR_ACCOUNT_KEY.to_string()); + let credential = credential::CredentialProvider::AccessKey(account_key); + (true, true, url, credential, account_name) + } else { + let account_name = account_name.ok_or(Error::MissingAccount {})?; + let account_url = format!("https://{}.blob.core.windows.net", &account_name); + let url = Url::parse(&account_url) + .context(UnableToParseUrlSnafu { url: account_url })?; + let credential = if let Some(bearer_token) = bearer_token { + Ok(credential::CredentialProvider::AccessKey(bearer_token)) + } else if let Some(access_key) = access_key { + Ok(credential::CredentialProvider::AccessKey(access_key)) + } else if let (Some(client_id), Some(client_secret), Some(tenant_id)) = + (client_id, client_secret, tenant_id) + { + let client_credential = + crate::client::oauth::ClientSecretOAuthProvider::new_azure( + client_id, + client_secret, + tenant_id, + authority_host, + ); + Ok(credential::CredentialProvider::ClientSecret( + client_credential, + )) + } else if let Some(query_pairs) = sas_query_pairs { + Ok(credential::CredentialProvider::SASToken(query_pairs)) + } else { + Err(Error::MissingCredentials {}) + }?; + (false, allow_http, url, credential, account_name) + }; + + let config = client::AzureConfig { + account, + allow_http, + retry_config, + service: storage_url, + container, + credentials: auth, + is_emulator, + }; + + let client = Arc::new(client::AzureClient::new(config)); + + Ok(MicrosoftAzure { client }) + } +} + +/// Parses the contents of the environment variable `env_name` as a URL +/// if present, otherwise falls back to default_url +fn url_from_env(env_name: &str, default_url: &str) -> Result { + let url = match std::env::var(env_name) { + Ok(env_value) => { + Url::parse(&env_value).context(UnableToParseEmulatorUrlSnafu { + env_name, + env_value, + })? + } + Err(_) => Url::parse(default_url).expect("Failed to parse default URL"), + }; + Ok(url) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tests::{ + copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter, + put_get_delete_list, rename_and_copy, stream_get, + }; + use std::env; + + // Helper macro to skip tests if TEST_INTEGRATION and the Azure environment + // variables are not set. + macro_rules! maybe_skip_integration { + () => {{ + dotenv::dotenv().ok(); + + let use_emulator = std::env::var("AZURE_USE_EMULATOR").is_ok(); + + let mut required_vars = vec!["OBJECT_STORE_BUCKET"]; + if !use_emulator { + required_vars.push("AZURE_STORAGE_ACCOUNT"); + required_vars.push("AZURE_STORAGE_ACCESS_KEY"); + } + let unset_vars: Vec<_> = required_vars + .iter() + .filter_map(|&name| match env::var(name) { + Ok(_) => None, + Err(_) => Some(name), + }) + .collect(); + let unset_var_names = unset_vars.join(", "); + + let force = std::env::var("TEST_INTEGRATION"); + + if force.is_ok() && !unset_var_names.is_empty() { + panic!( + "TEST_INTEGRATION is set, \ + but variable(s) {} need to be set", + unset_var_names + ) + } else if force.is_err() { + eprintln!( + "skipping Azure integration test - set {}TEST_INTEGRATION to run", + if unset_var_names.is_empty() { + String::new() + } else { + format!("{} and ", unset_var_names) + } + ); + return; + } else { + let builder = MicrosoftAzureBuilder::new() + .with_container_name( + env::var("OBJECT_STORE_BUCKET") + .expect("already checked OBJECT_STORE_BUCKET"), + ) + .with_use_emulator(use_emulator); + if !use_emulator { + builder + .with_account( + env::var("AZURE_STORAGE_ACCOUNT").unwrap_or_default(), + ) + .with_access_key( + env::var("AZURE_STORAGE_ACCESS_KEY").unwrap_or_default(), + ) + } else { + builder + } + } + }}; + } + + #[tokio::test] + async fn azure_blob_test() { + let integration = maybe_skip_integration!().build().unwrap(); + + put_get_delete_list(&integration).await; + list_uses_directories_correctly(&integration).await; + list_with_delimiter(&integration).await; + rename_and_copy(&integration).await; + copy_if_not_exists(&integration).await; + stream_get(&integration).await; + } + + // test for running integration test against actual blob service with service principal + // credentials. To run make sure all environment variables are set and remove the ignore + #[tokio::test] + #[ignore] + async fn azure_blob_test_sp() { + dotenv::dotenv().ok(); + let builder = MicrosoftAzureBuilder::new() + .with_account( + env::var("AZURE_STORAGE_ACCOUNT") + .expect("must be set AZURE_STORAGE_ACCOUNT"), + ) + .with_container_name( + env::var("OBJECT_STORE_BUCKET").expect("must be set OBJECT_STORE_BUCKET"), + ) + .with_client_secret_authorization( + env::var("AZURE_STORAGE_CLIENT_ID") + .expect("must be set AZURE_STORAGE_CLIENT_ID"), + env::var("AZURE_STORAGE_CLIENT_SECRET") + .expect("must be set AZURE_STORAGE_CLIENT_SECRET"), + env::var("AZURE_STORAGE_TENANT_ID") + .expect("must be set AZURE_STORAGE_TENANT_ID"), + ); + let integration = builder.build().unwrap(); + + put_get_delete_list(&integration).await; + list_uses_directories_correctly(&integration).await; + list_with_delimiter(&integration).await; + rename_and_copy(&integration).await; + copy_if_not_exists(&integration).await; + stream_get(&integration).await; + } +} diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 7241002a0bd..10e8d919652 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -18,7 +18,7 @@ //! Generic utilities reqwest based ObjectStore implementations pub mod backoff; -#[cfg(feature = "gcp")] +#[cfg(any(feature = "gcp", feature = "azure"))] pub mod oauth; pub mod pagination; pub mod retry; diff --git a/object_store/src/client/oauth.rs b/object_store/src/client/oauth.rs index 88e7a7b0f9e..220940629e0 100644 --- a/object_store/src/client/oauth.rs +++ b/object_store/src/client/oauth.rs @@ -16,13 +16,17 @@ // under the License. use crate::client::retry::RetryExt; -use crate::client::token::TemporaryToken; +use crate::client::token::{TemporaryToken, TokenCache}; use crate::RetryConfig; +use reqwest::header::{HeaderMap, HeaderValue, ACCEPT}; use reqwest::{Client, Method}; use ring::signature::RsaKeyPair; use snafu::{ResultExt, Snafu}; use std::time::{Duration, Instant}; +const CONTENT_TYPE_JSON: &str = "application/json"; +const AZURE_STORAGE_TOKEN_SCOPE: &str = "https://storage.azure.com/.default"; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("No RSA key found in pem file"))] @@ -219,3 +223,82 @@ fn b64_encode_obj(obj: &T) -> Result { let string = serde_json::to_string(obj).context(EncodeSnafu)?; Ok(base64::encode_config(string, base64::URL_SAFE_NO_PAD)) } + +/// Encapsulates the logic to perform an OAuth token challenge +#[derive(Debug)] +pub struct ClientSecretOAuthProvider { + scope: String, + token_url: String, + client_id: String, + client_secret: String, + cache: TokenCache, +} + +impl ClientSecretOAuthProvider { + /// Create a new [`ClientSecretOAuthProvider`] for an azure backed store + pub fn new_azure( + client_id: String, + client_secret: String, + tenant_id: String, + authority_host: Option, + ) -> Self { + let authority_host = authority_host.unwrap_or_else(|| { + crate::azure::authority_hosts::AZURE_PUBLIC_CLOUD.to_owned() + }); + + Self { + scope: AZURE_STORAGE_TOKEN_SCOPE.to_owned(), + token_url: format!("{}/{}/oauth2/v2.0/token", authority_host, tenant_id), + client_id, + client_secret, + cache: TokenCache::default(), + } + } + + /// Fetch a token + pub async fn fetch_token( + &self, + client: &Client, + retry: &RetryConfig, + ) -> Result { + self.cache + .get_or_insert_with(|| self.fetch_token_inner(client, retry)) + .await + } + + /// Fetch a fresh token + async fn fetch_token_inner( + &self, + client: &Client, + retry: &RetryConfig, + ) -> Result> { + let mut headers = HeaderMap::new(); + headers.append(ACCEPT, HeaderValue::from_static(CONTENT_TYPE_JSON)); + + let mut params = std::collections::HashMap::new(); + params.insert("client_id", self.client_id.as_str()); + params.insert("client_secret", self.client_secret.as_str()); + params.insert("scope", self.scope.as_str()); + params.insert("grant_type", "client_credentials"); + + let response: TokenResponse = client + .request(Method::POST, &self.token_url) + .headers(headers) + .form(¶ms) + .send_retry(retry) + .await + .context(TokenRequestSnafu)? + .error_for_status() + .context(TokenRequestSnafu)? + .json() + .await + .context(TokenRequestSnafu)?; + + let token = TemporaryToken { + token: response.access_token, + expiry: Instant::now() + Duration::from_secs(response.expires_in), + }; + + Ok(token) + } +} diff --git a/object_store/src/client/pagination.rs b/object_store/src/client/pagination.rs index 3ab17fe8b5a..1febe3ae0a9 100644 --- a/object_store/src/client/pagination.rs +++ b/object_store/src/client/pagination.rs @@ -49,8 +49,10 @@ where futures::stream::unfold(PaginationState::Start(state), move |state| async move { let (s, page_token) = match state { PaginationState::Start(s) => (s, None), - PaginationState::HasMore(s, page_token) => (s, Some(page_token)), - PaginationState::Done => { + PaginationState::HasMore(s, page_token) if !page_token.is_empty() => { + (s, Some(page_token)) + } + _ => { return None; } }; diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 374f5592e84..5811eba1aa2 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -165,10 +165,10 @@ pub mod memory; pub mod path; pub mod throttle; -#[cfg(any(feature = "gcp", feature = "aws"))] +#[cfg(any(feature = "gcp", feature = "aws", feature = "azure"))] mod client; -#[cfg(any(feature = "gcp", feature = "aws"))] +#[cfg(any(feature = "gcp", feature = "aws", feature = "azure"))] pub use client::{backoff::BackoffConfig, retry::RetryConfig}; #[cfg(any(feature = "azure", feature = "aws", feature = "gcp"))] @@ -506,8 +506,6 @@ mod tests { use tokio::io::AsyncWriteExt; pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) { - let store_str = storage.to_string(); - delete_fixtures(storage).await; let content_list = flatten_list_stream(storage, None).await.unwrap(); @@ -565,26 +563,16 @@ mod tests { let out_of_range = 200..300; let out_of_range_result = storage.get_range(&location, out_of_range).await; - if store_str.starts_with("MicrosoftAzureEmulator") { - // Azurite doesn't support x-ms-range-get-content-crc64 set by Azure SDK - // https://github.com/Azure/Azurite/issues/444 - let err = range_result.unwrap_err().to_string(); - assert!(err.contains("x-ms-range-get-content-crc64 header or parameter is not supported in Azurite strict mode"), "{}", err); - - let err = out_of_range_result.unwrap_err().to_string(); - assert!(err.contains("x-ms-range-get-content-crc64 header or parameter is not supported in Azurite strict mode"), "{}", err); - } else { - let bytes = range_result.unwrap(); - assert_eq!(bytes, expected_data.slice(range)); - - // Should be a non-fatal error - out_of_range_result.unwrap_err(); - - let ranges = vec![0..1, 2..3, 0..5]; - let bytes = storage.get_ranges(&location, &ranges).await.unwrap(); - for (range, bytes) in ranges.iter().zip(bytes) { - assert_eq!(bytes, expected_data.slice(range.clone())) - } + let bytes = range_result.unwrap(); + assert_eq!(bytes, expected_data.slice(range)); + + // Should be a non-fatal error + out_of_range_result.unwrap_err(); + + let ranges = vec![0..1, 2..3, 0..5]; + let bytes = storage.get_ranges(&location, &ranges).await.unwrap(); + for (range, bytes) in ranges.iter().zip(bytes) { + assert_eq!(bytes, expected_data.slice(range.clone())) } let head = storage.head(&location).await.unwrap(); @@ -725,7 +713,7 @@ mod tests { let location = Path::from("test_dir/test_upload_file.txt"); // Can write to storage - let data = get_vec_of_bytes(5_000_000, 10); + let data = get_vec_of_bytes(5_000, 10); let bytes_expected = data.concat(); let (_, mut writer) = storage.put_multipart(&location).await.unwrap(); for chunk in &data { diff --git a/object_store/src/util.rs b/object_store/src/util.rs index f548ed48a60..1c95214952d 100644 --- a/object_store/src/util.rs +++ b/object_store/src/util.rs @@ -30,11 +30,20 @@ pub fn format_prefix(prefix: Option<&crate::path::Path>) -> Option { /// Returns a formatted HTTP range header as per /// -#[cfg(any(feature = "aws", feature = "gcp"))] +#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] pub fn format_http_range(range: std::ops::Range) -> String { format!("bytes={}-{}", range.start, range.end.saturating_sub(1)) } +#[cfg(any(feature = "aws", feature = "azure"))] +pub(crate) fn hmac_sha256( + secret: impl AsRef<[u8]>, + bytes: impl AsRef<[u8]>, +) -> ring::hmac::Tag { + let key = ring::hmac::Key::new(ring::hmac::HMAC_SHA256, secret.as_ref()); + ring::hmac::sign(&key, bytes.as_ref()) +} + /// Collect a stream into [`Bytes`] avoiding copying in the event of a single chunk pub async fn collect_bytes(mut stream: S, size_hint: Option) -> Result where