Skip to content

Commit

Permalink
Fixing a bug in Blob scan continuation token handling
Browse files Browse the repository at this point in the history
  • Loading branch information
mathewc committed Jul 15, 2015
1 parent 897df0a commit 2b82daf
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 9 deletions.
Expand Up @@ -22,26 +22,27 @@ internal static class StorageBlobClientExtensions
}

List<IStorageListBlobItem> allResults = new List<IStorageListBlobItem>();
BlobContinuationToken currentToken = null;
BlobContinuationToken continuationToken = null;
IStorageBlobResultSegment result;

do
{
result = await client.ListBlobsSegmentedAsync(prefix, useFlatBlobListing, blobListingDetails,
maxResults: null, currentToken: currentToken, options: null, operationContext: null,
maxResults: null, currentToken: continuationToken, options: null, operationContext: null,
cancellationToken: cancellationToken);

if (result != null)
{
IEnumerable<IStorageListBlobItem> currentResults = result.Results;

if (currentResults != null)
{
allResults.AddRange(currentResults);
}

continuationToken = result.ContinuationToken;
}
}
while (result != null && result.ContinuationToken != null);
}
while (result != null && continuationToken != null);

return allResults;
}
Expand Down
Expand Up @@ -21,26 +21,27 @@ internal static class StorageBlobContainerExtensions
}

List<IStorageListBlobItem> allResults = new List<IStorageListBlobItem>();
BlobContinuationToken currentToken = null;
BlobContinuationToken continuationToken = null;
IStorageBlobResultSegment result;

do
{
result = await container.ListBlobsSegmentedAsync(prefix: null, useFlatBlobListing: useFlatBlobListing,
blobListingDetails: BlobListingDetails.None, maxResults: null, currentToken: currentToken,
blobListingDetails: BlobListingDetails.None, maxResults: null, currentToken: continuationToken,
options: null, operationContext: null, cancellationToken: cancellationToken);

if (result != null)
{
IEnumerable<IStorageListBlobItem> currentResults = result.Results;

if (currentResults != null)
{
allResults.AddRange(currentResults);
}

continuationToken = result.ContinuationToken;
}
}
while (result != null && result.ContinuationToken != null);
while (result != null && continuationToken != null);

return allResults;
}
Expand Down
@@ -0,0 +1,103 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Blobs.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Moq;
using Xunit;

namespace Microsoft.Azure.WebJobs.Host.UnitTests.Blobs.Listeners
{
public class StorageBlobClientExtensionsTests
{
[Theory]
[InlineData(4000)]
[InlineData(30000)]
public async Task ListBlobsAsync_FollowsContinuationTokensToEnd(int blobCount)
{
Mock<IStorageBlobClient> mockClient = new Mock<IStorageBlobClient>(MockBehavior.Strict);

int maxResults = 5000;
List<IStorageListBlobItem> blobs = GetMockBlobs(blobCount);
int numPages = (int)Math.Ceiling(((decimal)blobCount / maxResults));

// create the test data pages with continuation tokens
List<IStorageBlobResultSegment> blobSegments = new List<IStorageBlobResultSegment>();
IStorageBlobResultSegment initialSegement = null;
for (int i = 0; i < numPages; i++)
{
BlobContinuationToken continuationToken = null;
if (i < (numPages - 1))
{
// add a token for all but the last page
continuationToken = new BlobContinuationToken()
{
NextMarker = i.ToString()
};
}

Mock<IStorageBlobResultSegment> mockSegment = new Mock<IStorageBlobResultSegment>(MockBehavior.Strict);
mockSegment.SetupGet(p => p.Results).Returns(blobs.Skip(i * maxResults).Take(maxResults).ToArray());
mockSegment.SetupGet(p => p.ContinuationToken).Returns(continuationToken);

if (i == 0)
{
initialSegement = mockSegment.Object;
}
else
{
blobSegments.Add(mockSegment.Object);
}
}

// Mock the List function to return the correct blob page
HashSet<BlobContinuationToken> seenTokens = new HashSet<BlobContinuationToken>();
IStorageBlobResultSegment resultSegment = null;
mockClient.Setup(p => p.ListBlobsSegmentedAsync("test", true, BlobListingDetails.None, null, It.IsAny<BlobContinuationToken>(), null, null, CancellationToken.None))
.Callback<string, bool, BlobListingDetails, int?, BlobContinuationToken, BlobRequestOptions, OperationContext, CancellationToken>(
(prefix, useFlatBlobListing, blobListingDetails, maxResultsValue, currentToken, options, operationContext, cancellationToken) =>
{
// Previously this is where a bug existed - ListBlobsAsync wasn't handling
// continuation tokens properly and kept sending the same initial token
Assert.False(seenTokens.Contains(currentToken));
seenTokens.Add(currentToken);
if (currentToken == null)
{
resultSegment = initialSegement;
}
else
{
int idx = int.Parse(currentToken.NextMarker);
resultSegment = blobSegments[idx];
}
})
.Returns(() => { return Task.FromResult(resultSegment); });

IEnumerable<IStorageListBlobItem> results = await mockClient.Object.ListBlobsAsync("test", true, BlobListingDetails.None, CancellationToken.None);

Assert.Equal(blobCount, results.Count());
}

private class FakeBlobItem : IStorageListBlobItem
{
}

private List<IStorageListBlobItem> GetMockBlobs(int count)
{
List<IStorageListBlobItem> blobs = new List<IStorageListBlobItem>();
for (int i = 0; i < count; i++)
{
blobs.Add(new FakeBlobItem());
}
return blobs;
}
}
}
@@ -0,0 +1,103 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Blobs.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Moq;
using Xunit;

namespace Microsoft.Azure.WebJobs.Host.UnitTests.Blobs.Listeners
{
public class StorageBlobContainerExtensionsTests
{
[Theory]
[InlineData(4000)]
[InlineData(30000)]
public async Task ListBlobsAsync_FollowsContinuationTokensToEnd(int blobCount)
{
Mock<IStorageBlobContainer> mockContainer = new Mock<IStorageBlobContainer>(MockBehavior.Strict);

int maxResults = 5000;
List<IStorageListBlobItem> blobs = GetMockBlobs(blobCount);
int numPages = (int)Math.Ceiling(((decimal)blobCount / maxResults));

// create the test data pages with continuation tokens
List<IStorageBlobResultSegment> blobSegments = new List<IStorageBlobResultSegment>();
IStorageBlobResultSegment initialSegement = null;
for (int i = 0; i < numPages; i++)
{
BlobContinuationToken continuationToken = null;
if (i < (numPages - 1))
{
// add a token for all but the last page
continuationToken = new BlobContinuationToken()
{
NextMarker = i.ToString()
};
}

Mock<IStorageBlobResultSegment> mockSegment = new Mock<IStorageBlobResultSegment>(MockBehavior.Strict);
mockSegment.SetupGet(p => p.Results).Returns(blobs.Skip(i * maxResults).Take(maxResults).ToArray());
mockSegment.SetupGet(p => p.ContinuationToken).Returns(continuationToken);

if (i == 0)
{
initialSegement = mockSegment.Object;
}
else
{
blobSegments.Add(mockSegment.Object);
}
}

// Mock the List function to return the correct blob page
HashSet<BlobContinuationToken> seenTokens = new HashSet<BlobContinuationToken>();
IStorageBlobResultSegment resultSegment = null;
mockContainer.Setup(p => p.ListBlobsSegmentedAsync(null, true, BlobListingDetails.None, null, It.IsAny<BlobContinuationToken>(), null, null, CancellationToken.None))
.Callback<string, bool, BlobListingDetails, int?, BlobContinuationToken, BlobRequestOptions, OperationContext, CancellationToken>(
(prefix, useFlatBlobListing, blobListingDetails, maxResultsValue, currentToken, options, operationContext, cancellationToken) =>
{
// Previously this is where a bug existed - ListBlobsAsync wasn't handling
// continuation tokens properly and kept sending the same initial token
Assert.False(seenTokens.Contains(currentToken));
seenTokens.Add(currentToken);
if (currentToken == null)
{
resultSegment = initialSegement;
}
else
{
int idx = int.Parse(currentToken.NextMarker);
resultSegment = blobSegments[idx];
}
})
.Returns(() => { return Task.FromResult(resultSegment); });

IEnumerable<IStorageListBlobItem> results = await mockContainer.Object.ListBlobsAsync(true, CancellationToken.None);

Assert.Equal(blobCount, results.Count());
}

private class FakeBlobItem : IStorageListBlobItem
{
}

private List<IStorageListBlobItem> GetMockBlobs(int count)
{
List<IStorageListBlobItem> blobs = new List<IStorageListBlobItem>();
for (int i = 0; i < count; i++)
{
blobs.Add(new FakeBlobItem());
}
return blobs;
}
}
}
Expand Up @@ -118,6 +118,8 @@
<Compile Include="Blobs\Listeners\BlobTriggerExecutorTests.cs" />
<Compile Include="Blobs\Listeners\StorageAnalyticsLogEntryTests.cs" />
<Compile Include="Blobs\Listeners\StorageAnalyticsLogParserTests.cs" />
<Compile Include="Blobs\Listeners\StorageBlobClientExtensionsTests.cs" />
<Compile Include="Blobs\Listeners\StorageBlobContainerExtensionsTests.cs" />
<Compile Include="Blobs\SetupOfStreamIAsyncResultExtensions.cs" />
<Compile Include="Blobs\MockOfStreamExtensions.cs" />
<Compile Include="Blobs\UncompletedAsyncResult.cs" />
Expand Down

0 comments on commit 2b82daf

Please sign in to comment.