Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ and existing blocks together. Any blocks not specified in the block list and per
|`getPageBlobRanges`|`PageBlob`|Returns the list of valid page ranges for a page blob or snapshot of a page blob.
|`copyBlob`|`Common`|Copy a blob from one container to another one, even from different accounts.
|`createBlobSnapshot`|`Common`|Creates a read-only snapshot of a blob. The snapshot ID is returned in the `CamelAzureStorageBlobSnapshotId` header.
|`setBlobTags`|`Common`|Sets user-defined index tags on a blob. Tags are key-value pairs that can be used to filter and query blobs across containers. Tags can be provided via the `CamelAzureStorageBlobTags` header or as the message body (`Map<String, String>`).
|`getBlobTags`|`Common`|Retrieves user-defined index tags from a blob. The tags are returned as the message body (`Map<String, String>`) and also set in the `CamelAzureStorageBlobTags` header.
|===

Refer to the example section in this page to learn how to use these operations into your camel application.
Expand Down Expand Up @@ -740,6 +742,28 @@ from("direct:readSnapshot")
.to("mock:result");
--------------------------------------------------------------------------------

- `setBlobTags`

[source,java]
--------------------------------------------------------------------------------

from("direct:setBlobTags")
.setHeader(BlobConstants.BLOB_TAGS, constant(Map.of("status", "quarantine", "category", "document")))
.to("azure-storage-blob://camelazure/container1?blobName=hello.txt&operation=setBlobTags&serviceClient=#client")
.to("mock:result");
--------------------------------------------------------------------------------

- `getBlobTags`

[source,java]
--------------------------------------------------------------------------------

from("direct:getBlobTags")
.to("azure-storage-blob://camelazure/container1?blobName=hello.txt&operation=getBlobTags&serviceClient=#client")
.log("Tags: ${body}")
.to("mock:result");
--------------------------------------------------------------------------------

=== SAS Token generation example

SAS Blob Container tokens can be generated programmatically or via Azure UI. To generate the token with java code, the following can be done:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class BlobConfiguration implements Cloneable {
@UriParam(label = "producer",
enums = "listBlobContainers,createBlobContainer,deleteBlobContainer,listBlobs,getBlob,deleteBlob,downloadBlobToFile,downloadLink,"
+ "uploadBlockBlob,uploadBlockBlobChunked,stageBlockBlobList,commitBlobBlockList,getBlobBlockList,createAppendBlob,commitAppendBlob,createPageBlob,uploadPageBlob,resizePageBlob,"
+ "clearPageBlob,getPageBlobRanges,getChangeFeed,copyBlob,createBlobSnapshot",
+ "clearPageBlob,getPageBlobRanges,getChangeFeed,copyBlob,createBlobSnapshot,setBlobTags,getBlobTags",
defaultValue = "listBlobContainers")
private BlobOperationsDefinition operation = BlobOperationsDefinition.listBlobContainers;
@UriParam(label = "common")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ public ParallelTransferOptions getUploadParallelTransferOptions(final Exchange e
return null;
}

public Map<String, String> getBlobTags(final Exchange exchange) {
return BlobExchangeHeaders.getBlobTagsFromHeaders(exchange);
}

public BlobConfiguration getConfiguration() {
return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ public final class BlobConstants {
+ " downloadLink) it can be provided as input to target a specific blob snapshot.",
javaType = "String")
public static final String BLOB_SNAPSHOT_ID = HEADER_PREFIX + "SnapshotId";
@Metadata(description = "(producer) (setBlobTags) The tags to set on the blob as key-value pairs.\n"
+ "(consumer) The tags retrieved from the blob.",
javaType = "Map<String,String>")
public static final String BLOB_TAGS = HEADER_PREFIX + "Tags";

private BlobConstants() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,4 +485,14 @@ public BlobExchangeHeaders snapshotId(final String snapshotId) {
headers.put(BlobConstants.BLOB_SNAPSHOT_ID, snapshotId);
return this;
}

@SuppressWarnings("unchecked")
public static Map<String, String> getBlobTagsFromHeaders(final Exchange exchange) {
return getObjectFromHeaders(exchange, BlobConstants.BLOB_TAGS, Map.class);
}

public BlobExchangeHeaders blobTags(final Map<String, String> tags) {
headers.put(BlobConstants.BLOB_TAGS, tags);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,14 @@ public enum BlobOperationsDefinition {
/**
* Creates a read-only snapshot of a blob. The snapshot ID is returned in the exchange headers.
*/
createBlobSnapshot
createBlobSnapshot,
/**
* Sets user-defined index tags on a blob. Tags are key-value pairs that can be used to filter and query blobs
* across containers.
*/
setBlobTags,
/**
* Retrieves user-defined index tags from a blob.
*/
getBlobTags
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ public void process(final Exchange exchange) throws Exception {
case createBlobSnapshot:
setResponse(exchange, getBlobOperations(exchange).createBlobSnapshot(exchange));
break;
case setBlobTags:
setResponse(exchange, getBlobOperations(exchange).setBlobTags(exchange));
break;
case getBlobTags:
setResponse(exchange, getBlobOperations(exchange).getBlobTags(exchange));
break;
default:
throw new IllegalArgumentException("Unsupported operation");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@
import com.azure.storage.blob.models.PageRange;
import com.azure.storage.blob.models.PageRangeItem;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.options.BlobGetTagsOptions;
import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.blob.options.BlobSetTagsOptions;
import com.azure.storage.blob.options.BlobUploadFromFileOptions;
import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
import com.azure.storage.blob.options.ListPageRangesOptions;
Expand Down Expand Up @@ -347,6 +349,27 @@ public BlobClientWrapper withSnapshot(final String snapshotId) {
return new BlobClientWrapper(client.getSnapshotClient(snapshotId));
}

public Response<Void> setTags(
final Map<String, String> tags,
final BlobRequestConditions requestConditions,
final Duration timeout) {
BlobSetTagsOptions options = new BlobSetTagsOptions(tags);
if (requestConditions != null) {
options.setRequestConditions(requestConditions);
}
return client.setTagsWithResponse(options, timeout, Context.NONE);
}

public Response<Map<String, String>> getTags(
final BlobRequestConditions requestConditions,
final Duration timeout) {
BlobGetTagsOptions options = new BlobGetTagsOptions();
if (requestConditions != null) {
options.setRequestConditions(requestConditions);
}
return client.getTagsWithResponse(options, timeout, Context.NONE);
}

public BlobLeaseClient getLeaseClient() {
return new BlobLeaseClientBuilder().blobClient(client).buildClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,56 @@ public BlobOperationResponse createBlobSnapshot(final Exchange exchange) {
return BlobOperationResponse.create(snapshotClient.getSnapshotId(), exchangeHeaders.toMap());
}

@SuppressWarnings("unchecked")
public BlobOperationResponse setBlobTags(final Exchange exchange) {
ObjectHelper.notNull(exchange, MISSING_EXCHANGE);

if (LOG.isTraceEnabled()) {
LOG.trace("Setting tags on blob [{}] from exchange [{}]...", configurationProxy.getBlobName(exchange), exchange);
}

Map<String, String> tags = configurationProxy.getBlobTags(exchange);
if (tags == null) {
tags = exchange.getIn().getBody(Map.class);
}
if (tags == null || tags.isEmpty()) {
throw new IllegalArgumentException(
"Tags must be specified either as the message body (Map<String,String>) or via the "
+ BlobConstants.BLOB_TAGS + " header.");
}

final BlobCommonRequestOptions commonRequestOptions = getCommonRequestOptions(exchange);

final Response<Void> response = client.setTags(
tags,
commonRequestOptions.getBlobRequestConditions(),
commonRequestOptions.getTimeout());

final BlobExchangeHeaders exchangeHeaders = BlobExchangeHeaders.create()
.httpHeaders(response.getHeaders());

return BlobOperationResponse.createWithEmptyBody(exchangeHeaders.toMap());
}

public BlobOperationResponse getBlobTags(final Exchange exchange) {
if (LOG.isTraceEnabled()) {
LOG.trace("Getting tags from blob [{}] from exchange [{}]...", configurationProxy.getBlobName(exchange), exchange);
}

final BlobCommonRequestOptions commonRequestOptions = getCommonRequestOptions(exchange);

final Response<Map<String, String>> response = client.getTags(
commonRequestOptions.getBlobRequestConditions(),
commonRequestOptions.getTimeout());

final Map<String, String> tags = response.getValue();
final BlobExchangeHeaders exchangeHeaders = BlobExchangeHeaders.create()
.blobTags(tags)
.httpHeaders(response.getHeaders());

return BlobOperationResponse.create(tags, exchangeHeaders.toMap());
}

private DownloadRetryOptions getDownloadRetryOptions(final BlobConfigurationOptionsProxy configurationProxy) {
return new DownloadRetryOptions().setMaxRetryRequests(configurationProxy.getMaxRetryRequests());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import com.azure.core.http.rest.PagedIterable;
Expand Down Expand Up @@ -483,6 +485,103 @@ void testClearPages() throws Exception {
blobClientWrapper.delete(null, null, null);
}

@SuppressWarnings("unchecked")
@Test
void testSetAndGetBlobTags() {
final BlobClientWrapper blobClientWrapper = blobContainerClientWrapper.getBlobClientWrapper(randomBlobName);
final BlobOperations operations = new BlobOperations(configuration, blobClientWrapper);

final Map<String, String> tags = new HashMap<>();
tags.put("status", "quarantine");
tags.put("category", "document");
tags.put("priority", "high");

// set tags via header
final Exchange setExchange = new DefaultExchange(context);
setExchange.getIn().setHeader(BlobConstants.BLOB_TAGS, tags);

final BlobOperationResponse setResponse = operations.setBlobTags(setExchange);
assertNotNull(setResponse);
assertTrue((boolean) setResponse.getBody());

// get tags
final Exchange getExchange = new DefaultExchange(context);
final BlobOperationResponse getResponse = operations.getBlobTags(getExchange);

assertNotNull(getResponse);
assertNotNull(getResponse.getBody());

final Map<String, String> retrievedTags = (Map<String, String>) getResponse.getBody();
assertEquals(3, retrievedTags.size());
assertEquals("quarantine", retrievedTags.get("status"));
assertEquals("document", retrievedTags.get("category"));
assertEquals("high", retrievedTags.get("priority"));

// also verify tags are in exchange headers
assertEquals(retrievedTags, getResponse.getHeaders().get(BlobConstants.BLOB_TAGS));
}

@SuppressWarnings("unchecked")
@Test
void testSetBlobTagsFromBody() {
final BlobClientWrapper blobClientWrapper = blobContainerClientWrapper.getBlobClientWrapper(randomBlobName);
final BlobOperations operations = new BlobOperations(configuration, blobClientWrapper);

final Map<String, String> tags = new HashMap<>();
tags.put("owner", "test-user");
tags.put("scanned", "true");

// set tags via body
final Exchange setExchange = new DefaultExchange(context);
setExchange.getIn().setBody(tags);

final BlobOperationResponse setResponse = operations.setBlobTags(setExchange);
assertNotNull(setResponse);
assertTrue((boolean) setResponse.getBody());

// verify by getting tags back
final Exchange getExchange = new DefaultExchange(context);
final BlobOperationResponse getResponse = operations.getBlobTags(getExchange);

final Map<String, String> retrievedTags = (Map<String, String>) getResponse.getBody();
assertEquals(2, retrievedTags.size());
assertEquals("test-user", retrievedTags.get("owner"));
assertEquals("true", retrievedTags.get("scanned"));
}

@SuppressWarnings("unchecked")
@Test
void testOverwriteBlobTags() {
final BlobClientWrapper blobClientWrapper = blobContainerClientWrapper.getBlobClientWrapper(randomBlobName);
final BlobOperations operations = new BlobOperations(configuration, blobClientWrapper);

// set initial tags
final Map<String, String> initialTags = new HashMap<>();
initialTags.put("status", "pending");

final Exchange setExchange1 = new DefaultExchange(context);
setExchange1.getIn().setHeader(BlobConstants.BLOB_TAGS, initialTags);
operations.setBlobTags(setExchange1);

// overwrite with new tags
final Map<String, String> newTags = new HashMap<>();
newTags.put("status", "processed");
newTags.put("result", "clean");

final Exchange setExchange2 = new DefaultExchange(context);
setExchange2.getIn().setHeader(BlobConstants.BLOB_TAGS, newTags);
operations.setBlobTags(setExchange2);

// verify only new tags are present (set replaces all tags)
final Exchange getExchange = new DefaultExchange(context);
final BlobOperationResponse getResponse = operations.getBlobTags(getExchange);

final Map<String, String> retrievedTags = (Map<String, String>) getResponse.getBody();
assertEquals(2, retrievedTags.size());
assertEquals("processed", retrievedTags.get("status"));
assertEquals("clean", retrievedTags.get("result"));
}

@Test
void testGetPageBlobRanges() throws Exception {
final BlobClientWrapper blobClientWrapper = blobContainerClientWrapper.getBlobClientWrapper("upload_test_file.txt");
Expand Down
Loading
Loading