Skip to content

Commit

Permalink
Batch kill in azure (#15770)
Browse files Browse the repository at this point in the history
* Multi kill

* add some unit tests

* Fix param

* Fix deleteBatchFiles

* Fix unit tests

* Add tests

* Save work on batch kill

* add tests

* Fix unit tests

* Update extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java

Co-authored-by: Suneet Saldanha <suneet@apache.org>

* Fix unit tests

* Update extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java

Co-authored-by: Suneet Saldanha <suneet@apache.org>

* fix test

* fix test

* Add test

---------

Co-authored-by: Suneet Saldanha <suneet@apache.org>
  • Loading branch information
georgew5656 and suneet-s committed Jan 31, 2024
1 parent 0089f6b commit 5edfa94
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -40,6 +43,7 @@
public class AzureDataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(AzureDataSegmentKiller.class);
private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256; // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id

private final AzureDataSegmentConfig segmentConfig;
private final AzureInputDataConfig inputDataConfig;
Expand All @@ -63,6 +67,51 @@ public AzureDataSegmentKiller(
this.azureCloudBlobIterableFactory = azureCloudBlobIterableFactory;
}

@Override
public void kill(List<DataSegment> segments) throws SegmentLoadingException
{
if (segments.isEmpty()) {
return;
}
if (segments.size() == 1) {
kill(segments.get(0));
return;
}

// create a list of keys to delete
Map<String, List<String>> containerToKeysToDelete = new HashMap<>();
for (DataSegment segment : segments) {
Map<String, Object> loadSpec = segment.getLoadSpec();
final String containerName = MapUtils.getString(loadSpec, "containerName");
final String blobPath = MapUtils.getString(loadSpec, "blobPath");
List<String> keysToDelete = containerToKeysToDelete.computeIfAbsent(
containerName,
k -> new ArrayList<>()
);
keysToDelete.add(blobPath);
}

boolean shouldThrowException = false;
for (Map.Entry<String, List<String>> containerToKeys : containerToKeysToDelete.entrySet()) {
boolean batchSuccessful = azureStorage.batchDeleteFiles(
containerToKeys.getKey(),
containerToKeys.getValue(),
null
);

if (!batchSuccessful) {
shouldThrowException = true;
}
}

if (shouldThrowException) {
throw new SegmentLoadingException(
"Couldn't delete segments from Azure. See the task logs for more details."
);
}
}


@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.azure.core.http.rest.PagedIterable;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchStorageException;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobRange;
Expand All @@ -34,6 +35,7 @@
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.common.Utility;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.logger.Logger;
Expand All @@ -58,6 +60,10 @@ public class AzureStorage
// Default value from Azure library
private static final int DELTA_BACKOFF_MS = 30_000;

// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256;


private static final Logger log = new Logger(AzureStorage.class);

private final AzureClientFactory azureClientFactory;
Expand Down Expand Up @@ -172,20 +178,60 @@ public InputStream getBlockBlobInputStream(long offset, Long length, final Strin
return blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).openInputStream(new BlobInputStreamOptions().setRange(new BlobRange(offset, length)));
}

public void batchDeleteFiles(String containerName, Iterable<String> paths, Integer maxAttempts)
/**
* Deletes multiple files from the specified container.
*
* @param containerName The name of the container from which files will be deleted.
* @param paths An iterable of file paths to be deleted.
* @param maxAttempts (Optional) The maximum number of attempts to delete each file.
* If null, the system default number of attempts will be used.
* @return true if all files were successfully deleted; false otherwise.
*/
public boolean batchDeleteFiles(String containerName, Iterable<String> paths, @Nullable Integer maxAttempts)
throws BlobBatchStorageException
{
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts);
BlobBatchClient blobBatchClient = azureClientFactory.getBlobBatchClient(blobContainerClient);
List<String> blobUris = Streams.stream(paths).map(path -> blobContainerClient.getBlobContainerUrl() + "/" + path).collect(Collectors.toList());

// We have to call forEach on the response because this is the only way azure batch will throw an exception on a operation failure.
azureClientFactory.getBlobBatchClient(blobContainerClient).deleteBlobs(
blobUris,
DeleteSnapshotsOptionType.INCLUDE
).forEach(response ->
log.debug("Deleting blob with URL %s completed with status code %d%n",
response.getRequest().getUrl(), response.getStatusCode())
boolean hadException = false;
List<List<String>> keysChunks = Lists.partition(
blobUris,
MAX_MULTI_OBJECT_DELETE_SIZE
);
for (List<String> chunkOfKeys : keysChunks) {
try {
log.info(
"Removing from container [%s] the following files: [%s]",
containerName,
chunkOfKeys
);
// We have to call forEach on the response because this is the only way azure batch will throw an exception on a operation failure.
blobBatchClient.deleteBlobs(
chunkOfKeys,
DeleteSnapshotsOptionType.INCLUDE
).forEach(response ->
log.debug("Deleting blob with URL %s completed with status code %d%n",
response.getRequest().getUrl(), response.getStatusCode())
);
}
catch (BlobStorageException | BlobBatchStorageException e) {
hadException = true;
log.noStackTrace().warn(e,
"Unable to delete from container [%s], the following keys [%s]",
containerName,
chunkOfKeys
);
}
catch (Exception e) {
hadException = true;
log.noStackTrace().warn(e,
"Unexpected exception occurred when deleting from container [%s], the following keys [%s]",
containerName,
chunkOfKeys
);
}
}
return !hadException;
}

public List<String> listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
Expand All @@ -39,6 +41,7 @@
import java.net.URI;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;

public class AzureDataSegmentKillerTest extends EasyMockSupport
Expand All @@ -47,6 +50,8 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
private static final String CONTAINER = "test";
private static final String PREFIX = "test/log";
private static final String BLOB_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
private static final String BLOB_PATH_2 = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/2/0/index.zip";

private static final int MAX_KEYS = 1;
private static final int MAX_TRIES = 3;

Expand All @@ -70,6 +75,18 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
1
);

private static final DataSegment DATA_SEGMENT_2 = new DataSegment(
"test",
Intervals.of("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("containerName", CONTAINER_NAME, "blobPath", BLOB_PATH_2),
null,
null,
NoneShardSpec.instance(),
0,
1
);

private AzureDataSegmentConfig segmentConfig;
private AzureInputDataConfig inputDataConfig;
private AzureAccountConfig accountConfig;
Expand Down Expand Up @@ -285,4 +302,89 @@ private void common_test_kill_StorageExceptionExtendedError_throwsException()

verifyAll();
}

@Test
public void killBatchTest() throws SegmentLoadingException, BlobStorageException
{
Capture<List<String>> deletedFilesCapture = Capture.newInstance();
EasyMock.expect(azureStorage.batchDeleteFiles(
EasyMock.eq(CONTAINER_NAME),
EasyMock.capture(deletedFilesCapture),
EasyMock.eq(null)
)).andReturn(true);

replayAll();

AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);

killer.kill(ImmutableList.of(DATA_SEGMENT, DATA_SEGMENT_2));

verifyAll();

Assert.assertEquals(
ImmutableSet.of(BLOB_PATH, BLOB_PATH_2),
new HashSet<>(deletedFilesCapture.getValue())
);
}

@Test(expected = RuntimeException.class)
public void test_killBatch_runtimeException()
throws SegmentLoadingException, BlobStorageException
{

EasyMock.expect(azureStorage.batchDeleteFiles(CONTAINER_NAME, ImmutableList.of(BLOB_PATH, BLOB_PATH_2), null))
.andThrow(new RuntimeException(""));

replayAll();

AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);

killer.kill(ImmutableList.of(DATA_SEGMENT, DATA_SEGMENT_2));

verifyAll();
}

@Test(expected = SegmentLoadingException.class)
public void test_killBatch_SegmentLoadingExceptionOnError()
throws SegmentLoadingException, BlobStorageException
{

EasyMock.expect(azureStorage.batchDeleteFiles(CONTAINER_NAME, ImmutableList.of(BLOB_PATH, BLOB_PATH_2), null))
.andReturn(false);

replayAll();

AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);

killer.kill(ImmutableList.of(DATA_SEGMENT, DATA_SEGMENT_2));

verifyAll();
}

@Test
public void killBatch_emptyList() throws SegmentLoadingException, BlobStorageException
{

AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.kill(ImmutableList.of());
}

@Test
public void killBatch_singleSegment() throws SegmentLoadingException, BlobStorageException
{

List<String> deletedFiles = new ArrayList<>();
final String dirPath = Paths.get(BLOB_PATH).getParent().toString();

// For a single segment, fall back to regular kill(DataSegment) logic
EasyMock.expect(azureStorage.emptyCloudBlobDirectory(CONTAINER_NAME, dirPath)).andReturn(deletedFiles);

replayAll();

AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);

killer.kill(ImmutableList.of(DATA_SEGMENT));

verifyAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.List;


Expand Down Expand Up @@ -121,7 +122,7 @@ public void testListBlobsWithPrefixInContainerSegmented() throws BlobStorageExce
@Test
public void testBatchDeleteFiles_emptyResponse() throws BlobStorageException
{
String containerUrl = "https://implysaasdeveastussa.blob.core.windows.net/container";
String containerUrl = "https://storageaccount.blob.core.windows.net/container";
BlobBatchClient blobBatchClient = Mockito.mock(BlobBatchClient.class);

SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>();
Expand All @@ -138,8 +139,67 @@ public void testBatchDeleteFiles_emptyResponse() throws BlobStorageException
captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
);

azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME), null);
boolean deleteSuccessful = azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME), null);
Assert.assertEquals(captor.getValue().get(0), containerUrl + "/" + BLOB_NAME);
Assert.assertTrue(deleteSuccessful);
}

@Test
public void testBatchDeleteFiles_error() throws BlobStorageException
{
String containerUrl = "https://storageaccount.blob.core.windows.net/container";
BlobBatchClient blobBatchClient = Mockito.mock(BlobBatchClient.class);

SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>();
supplier.set(new TestPagedResponse<>(ImmutableList.of()));

ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);

Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT);
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
Mockito.doThrow(new RuntimeException()).when(blobBatchClient).deleteBlobs(
captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
);

boolean deleteSuccessful = azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME), null);
Assert.assertEquals(captor.getValue().get(0), containerUrl + "/" + BLOB_NAME);
Assert.assertFalse(deleteSuccessful);
}

@Test
public void testBatchDeleteFiles_emptyResponse_multipleResponses() throws BlobStorageException
{
String containerUrl = "https://storageaccount.blob.core.windows.net/container";
BlobBatchClient blobBatchClient = Mockito.mock(BlobBatchClient.class);

SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>();
supplier.set(new TestPagedResponse<>(ImmutableList.of()));
PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);

ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);

Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT);
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs(
captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
);


List<String> blobNameList = new ArrayList<>();
for (int i = 0; i <= 257; i++) {
blobNameList.add(BLOB_NAME + i);
}

boolean deleteSuccessful = azureStorage.batchDeleteFiles(CONTAINER, blobNameList, null);

List<List<String>> deletedValues = captor.getAllValues();
Assert.assertEquals(deletedValues.get(0).size(), 256);
Assert.assertEquals(deletedValues.get(1).size(), 2);
Assert.assertTrue(deleteSuccessful);
}
}

Loading

0 comments on commit 5edfa94

Please sign in to comment.