Skip to content

Commit

Permalink
Object tagging (#4754) (#4999)
Browse files Browse the repository at this point in the history
* Object tagging (#4754)

* Allow disabling tagging

* Rename to disable_tagging
  • Loading branch information
tustvold committed Oct 30, 2023
1 parent e4bb1e9 commit 11b2f5f
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 4 deletions.
22 changes: 22 additions & 0 deletions object_store/src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ pub struct AmazonS3Builder {
copy_if_not_exists: Option<ConfigValue<S3CopyIfNotExists>>,
/// Put precondition
conditional_put: Option<ConfigValue<S3ConditionalPut>>,
/// Ignore tags
disable_tagging: ConfigValue<bool>,
}

/// Configuration keys for [`AmazonS3Builder`]
Expand Down Expand Up @@ -299,6 +301,15 @@ pub enum AmazonS3ConfigKey {
/// Skip signing request
SkipSignature,

/// Disable tagging objects
///
/// This can be desirable if not supported by the backing store
///
/// Supported keys:
/// - `aws_disable_tagging`
/// - `disable_tagging`
DisableTagging,

/// Client options
Client(ClientConfigKey),
}
Expand All @@ -322,6 +333,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::SkipSignature => "aws_skip_signature",
Self::CopyIfNotExists => "aws_copy_if_not_exists",
Self::ConditionalPut => "aws_conditional_put",
Self::DisableTagging => "aws_disable_tagging",
Self::Client(opt) => opt.as_ref(),
}
}
Expand Down Expand Up @@ -350,6 +362,7 @@ impl FromStr for AmazonS3ConfigKey {
"aws_skip_signature" | "skip_signature" => Ok(Self::SkipSignature),
"aws_copy_if_not_exists" | "copy_if_not_exists" => Ok(Self::CopyIfNotExists),
"aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut),
"aws_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
// Backwards compatibility
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
_ => match s.parse() {
Expand Down Expand Up @@ -453,6 +466,7 @@ impl AmazonS3Builder {
self.client_options = self.client_options.with_config(key, value)
}
AmazonS3ConfigKey::SkipSignature => self.skip_signature.parse(value),
AmazonS3ConfigKey::DisableTagging => self.disable_tagging.parse(value),
AmazonS3ConfigKey::CopyIfNotExists => {
self.copy_if_not_exists = Some(ConfigValue::Deferred(value.into()))
}
Expand Down Expand Up @@ -525,6 +539,7 @@ impl AmazonS3Builder {
AmazonS3ConfigKey::ConditionalPut => {
self.conditional_put.as_ref().map(ToString::to_string)
}
AmazonS3ConfigKey::DisableTagging => Some(self.disable_tagging.to_string()),
}
}

Expand Down Expand Up @@ -735,6 +750,12 @@ impl AmazonS3Builder {
self
}

/// If set to `true` will ignore any tags provided to put_opts
pub fn with_disable_tagging(mut self, ignore: bool) -> Self {
self.disable_tagging = ignore.into();
self
}

/// Create a [`AmazonS3`] instance from the provided values,
/// consuming `self`.
pub fn build(mut self) -> Result<AmazonS3> {
Expand Down Expand Up @@ -851,6 +872,7 @@ impl AmazonS3Builder {
client_options: self.client_options,
sign_payload: !self.unsigned_payload.get()?,
skip_signature: self.skip_signature.get()?,
disable_tagging: self.disable_tagging.get()?,
checksum,
copy_if_not_exists,
conditional_put: put_precondition,
Expand Down
23 changes: 23 additions & 0 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ pub struct S3Config {
pub client_options: ClientOptions,
pub sign_payload: bool,
pub skip_signature: bool,
pub disable_tagging: bool,
pub checksum: Option<Checksum>,
pub copy_if_not_exists: Option<S3CopyIfNotExists>,
pub conditional_put: Option<S3ConditionalPut>,
Expand Down Expand Up @@ -588,6 +589,28 @@ impl S3Client {
version,
})
}

#[cfg(test)]
pub async fn get_object_tagging(&self, path: &Path) -> Result<Response> {
let credential = self.config.get_credential().await?;
let url = format!("{}?tagging", self.config.path_url(path));
let response = self
.client
.request(Method::GET, url)
.with_aws_sigv4(
credential.as_deref(),
&self.config.region,
"s3",
self.config.sign_payload,
None,
)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
Ok(response)
}
}

#[async_trait]
Expand Down
16 changes: 14 additions & 2 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use reqwest::header::{IF_MATCH, IF_NONE_MATCH};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::Method;
use std::{sync::Arc, time::Duration};
use tokio::io::AsyncWrite;
Expand All @@ -52,6 +52,8 @@ use crate::{
PutOptions, PutResult, Result,
};

static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");

mod builder;
mod checksum;
mod client;
Expand Down Expand Up @@ -160,7 +162,12 @@ impl Signer for AmazonS3 {
#[async_trait]
impl ObjectStore for AmazonS3 {
async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) -> Result<PutResult> {
let request = self.client.put_request(location, bytes);
let mut request = self.client.put_request(location, bytes);
let tags = opts.tags.encoded();
if !tags.is_empty() && !self.client.config().disable_tagging {
request = request.header(&TAGS_HEADER, tags);
}

match (opts.mode, &self.client.config().conditional_put) {
(PutMode::Overwrite, _) => request.send().await,
(PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
Expand Down Expand Up @@ -342,6 +349,11 @@ mod tests {
stream_get(&integration).await;
multipart(&integration, &integration).await;

tagging(&integration, !config.disable_tagging, |p| {
let client = Arc::clone(&integration.client);
async move { client.get_object_tagging(&p).await }
})
.await;
if test_not_exists {
copy_if_not_exists(&integration).await;
}
Expand Down
22 changes: 22 additions & 0 deletions object_store/src/azure/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ pub struct MicrosoftAzureBuilder {
///
/// i.e. https://{account_name}.dfs.fabric.microsoft.com
use_fabric_endpoint: ConfigValue<bool>,
/// When set to true, skips tagging objects
disable_tagging: ConfigValue<bool>,
}

/// Configuration keys for [`MicrosoftAzureBuilder`]
Expand Down Expand Up @@ -321,6 +323,15 @@ pub enum AzureConfigKey {
/// - `container_name`
ContainerName,

/// Disables tagging objects
///
/// This can be desirable if not supported by the backing store
///
/// Supported keys:
/// - `azure_disable_tagging`
/// - `disable_tagging`
DisableTagging,

/// Client options
Client(ClientConfigKey),
}
Expand All @@ -344,6 +355,7 @@ impl AsRef<str> for AzureConfigKey {
Self::FederatedTokenFile => "azure_federated_token_file",
Self::UseAzureCli => "azure_use_azure_cli",
Self::ContainerName => "azure_container_name",
Self::DisableTagging => "azure_disable_tagging",
Self::Client(key) => key.as_ref(),
}
}
Expand Down Expand Up @@ -387,6 +399,7 @@ impl FromStr for AzureConfigKey {
"azure_use_fabric_endpoint" | "use_fabric_endpoint" => Ok(Self::UseFabricEndpoint),
"azure_use_azure_cli" | "use_azure_cli" => Ok(Self::UseAzureCli),
"azure_container_name" | "container_name" => Ok(Self::ContainerName),
"azure_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
// Backwards compatibility
"azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
_ => match s.parse() {
Expand Down Expand Up @@ -503,6 +516,7 @@ impl MicrosoftAzureBuilder {
self.client_options = self.client_options.with_config(key, value)
}
AzureConfigKey::ContainerName => self.container_name = Some(value.into()),
AzureConfigKey::DisableTagging => self.disable_tagging.parse(value),
};
self
}
Expand Down Expand Up @@ -556,6 +570,7 @@ impl MicrosoftAzureBuilder {
AzureConfigKey::UseAzureCli => Some(self.use_azure_cli.to_string()),
AzureConfigKey::Client(key) => self.client_options.get_config_value(key),
AzureConfigKey::ContainerName => self.container_name.clone(),
AzureConfigKey::DisableTagging => Some(self.disable_tagging.to_string()),
}
}

Expand Down Expand Up @@ -781,6 +796,12 @@ impl MicrosoftAzureBuilder {
self
}

/// If set to `true` will ignore any tags provided to put_opts
pub fn with_disable_tagging(mut self, ignore: bool) -> Self {
self.disable_tagging = ignore.into();
self
}

/// Configure a connection to container with given name on Microsoft Azure Blob store.
pub fn build(mut self) -> Result<MicrosoftAzure> {
if let Some(url) = self.url.take() {
Expand Down Expand Up @@ -885,6 +906,7 @@ impl MicrosoftAzureBuilder {
account,
is_emulator,
container,
disable_tagging: self.disable_tagging.get()?,
retry_config: self.retry_config,
client_options: self.client_options,
service: storage_url,
Expand Down
27 changes: 26 additions & 1 deletion object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ use url::Url;

const VERSION_HEADER: &str = "x-ms-version-id";

static TAGS_HEADER: HeaderName = HeaderName::from_static("x-ms-tags");

/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
Expand Down Expand Up @@ -124,11 +126,12 @@ pub(crate) struct AzureConfig {
pub retry_config: RetryConfig,
pub service: Url,
pub is_emulator: bool,
pub disable_tagging: bool,
pub client_options: ClientOptions,
}

impl AzureConfig {
fn path_url(&self, path: &Path) -> Url {
pub(crate) fn path_url(&self, path: &Path) -> Url {
let mut url = self.service.clone();
{
let mut path_mut = url.path_segments_mut().unwrap();
Expand Down Expand Up @@ -229,6 +232,11 @@ impl AzureClient {
}
};

let builder = match (opts.tags.encoded(), self.config.disable_tagging) {
("", _) | (_, true) => builder,
(tags, false) => builder.header(&TAGS_HEADER, tags),
};

let response = builder.header(&BLOB_TYPE, "BlockBlob").send().await?;
Ok(get_put_result(response.headers(), VERSION_HEADER).context(MetadataSnafu)?)
}
Expand Down Expand Up @@ -315,6 +323,23 @@ impl AzureClient {

Ok(())
}

#[cfg(test)]
pub async fn get_blob_tagging(&self, path: &Path) -> Result<Response> {
let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let response = self
.client
.request(Method::GET, url)
.query(&[("comp", "tags")])
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
.context(GetRequestSnafu {
path: path.as_ref(),
})?;
Ok(response)
}
}

#[async_trait]
Expand Down
7 changes: 7 additions & 0 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ mod tests {
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;

let validate = !integration.client.config().disable_tagging;
tagging(&integration, validate, |p| {
let client = Arc::clone(&integration.client);
async move { client.get_blob_tagging(&p).await }
})
.await
}

#[test]
Expand Down
Loading

0 comments on commit 11b2f5f

Please sign in to comment.