-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migrate to the New Azure Storage SDKs #763
Migrate to the New Azure Storage SDKs #763
Conversation
This reverts commit 0d47a76.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this PR, Will! I think this would also be a good opportunity for us to bump the version of this package to v2.0.0, since we're changing a major dependency and since we're removing a lot of legacy public APIs.
Love to see this change. It is pretty dense though as you said. I do have a question: for consumers that already use these new clients and specifically the dependency injection extensions from Azure SDK, have you included a pattern so that we can just re-use all that configuration and pass an already configured Reading a bit of the PR, I do have concerns for this as is. Specifically around the dependency on the custom pipeline policies being injected to the all the clients. I understand how you design needs those, but can we explore if there is a good way to allow consumer to use existing clients? I think this is important as the Azure SDK team has already provided a great way to configure and build clients via
|
Yes, I have been thinking about this, as we'll ultimately read the client configurations for the Durable Functions extensions from the app settings (similar to how we do it today for Managed Identity and how the Functions Host handles the new client SDKs). I am not yet satisfied with my current draft's solution, so I'll be noodling on that a bit more. I agree that we want to enable users to be able to easily configure their client options and potentially re-use their service clients. There are extensions provided by the Azure extensions SDK to resolve the options and the client from the configuration automatically. But should we use the policies? I think it's the most desired way of performing these sorts of actions, as we're relying on the native retry capabilities of the SDKs (it may become confusing to allow users to configure retries separately from the client options). However, it is definitely not easy to give users the flexibility of providing their own clients and our own policies. Perhaps DI could indeed help here with something like named options...? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First round of comments before I head out on vacation. I'm really excited about this PR! I'm hoping that @davidmrdavid and @amdeel can help push it forward in the meantime!
/// <param name="options">An optional set of client options.</param> | ||
/// <returns>An Azure Queue Storage service client whose connection is based on the given <paramref name="accountName"/>.</returns> | ||
/// <exception cref="ArgumentNullException"> | ||
/// <paramref name="accountName"/> is <see langword="null"/> or consists entirely of white space characters. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think whitespace is one word?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method string.IsNullOrWhiteSpace uses two words, but I see the wikipedia page uses one. A cursory Google search shows a hyphenated approach with "white-space." I'll let you choose which one we use lol!
@@ -32,20 +32,24 @@ | |||
</PropertyGroup> | |||
|
|||
<ItemGroup> | |||
<PackageReference Include="Azure.Core" Version="1.24.0" /> | |||
<PackageReference Include="Azure.Data.Tables" Version="12.6.0" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that there's already a 12.6.1 version available. Should we make the version numbers here to be {major}.{minor}.* just so that we automatically pick up the patch fixes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can definitely do that to make it easier. However, I don't see other package references using the wildcard. Is it ok we do it for these?
<PackageReference Include="Azure.Core" Version="1.24.0" /> | ||
<PackageReference Include="Azure.Data.Tables" Version="12.6.0" /> | ||
<PackageReference Include="Azure.Storage.Blobs" Version="12.12.0" /> | ||
<PackageReference Include="Azure.Storage.Queues" Version="12.10.0" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need to be careful about our version selection here since we need to remain compatible with other storage related packages, like Microsoft.Azure.WebJobs.Extensions.Storage. In particular, we'll need to see which versions exist for which extension bundles so that we can know which Azure Functions extension bundles we can safely include this change in. FYI @davidmrdavid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is something to be careful with. A good source of reference to quickly validate compatibility with Extension Bundles is to look at the release notes here, which contain the versions of each NuGet package included in a bundles release.
I'm happy to help test more in-depth once other parts of the PR feedback are addressed. For now, I'd say we can keep this in the backburner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh ok! I see that Microsoft.Azure.WebJobs.Extensions.Storage.Queues and Microsoft.Azure.WebJobs.Extensions.Storage.Blobs use Azure.Storage.Blobs (>= 12.12.0)
Azure.Storage.Queues (>= 12.10.0). For our purposes, does that mean we can continue to use the latest 12.* version, or should we pin the minimums?
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.*" PrivateAssets="All" /> | ||
<PackageReference Include="System.Linq.Async" Version="6.0.1" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we'll be able to safely add dependencies to 6.x framework package versions since many of them remove support for .NET Core 2.x (and we still have Azure Functions customers that depend on this older framework). We'll need to do some follow-up research on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While it looks like a framework package, it is actually outside of the BCL so it shouldn't conflict with earlier frameworks on its own. However, its dependencies like Microsoft.Bcl.AsyncInterfaces is an out-of-band (OOB) package meant to shim IAsyncEnumerable<T>
into .NET Standard (and standard-compatible) frameworks. I don't think there should be an issue using the latest .NET 6-equivalent APIs in a .NET Core 3.1 app, but I'll check.
|
||
public override void Process(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline) | ||
{ | ||
stats.StorageRequests.Increment(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love this pipeline stuff you've added! It seems to nicely cover much of the functionality we had before!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Hopefully it makes things a little easier.
I did have to convert the error decoration into an extension method though. I originally also tried to write it as policy, but request failure exceptions are not thrown by the HTTP pipeline. Instead, each client (like the BlobServiceClient
) is responsible for interpreting the response codes and throwing exceptions as appropriate. So we must wrap each client's method with the new DecorateFailure
method, as we do not want to make assumptions about how each method treats each status code ahead of time.
This reverts commit a44c125.
using System.Threading.Tasks; | ||
using DurableTask.AzureStorage.Storage; | ||
using Newtonsoft.Json; | ||
|
||
class BlobLease : Lease | ||
class BlobPartitionLease : Lease |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name BlobLease
is used by the blob library, so I thought I'd change these type names to avoid the ambiguity and make their purpose a bit clearer
|
||
/// <summary> | ||
/// Simple buffer manager intended for use with Azure Storage SDK and compression code. | ||
/// It is not intended to be robust enough for external use. | ||
/// </summary> | ||
class SimpleBufferManager : IBufferManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface no longer exists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking great! Nice to see it narrowed down to just AzureStorage.
Not fully done with the review, but some comments for now.
} | ||
|
||
await Task.WhenAll(downloadTasks); | ||
BlobPartitionLease[] leases = await Task.WhenAll(page.Values.Select(b => this.DownloadLeaseBlob(b, cancellationToken))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this is going to do a fan out download, then yield them all one by one. Was this intentionally chosen? If not, what do you think of:
await foreach (Blob blob in this.taskHubContainer.ListBlobsAsync(this.blobDirectoryName, cancellationToken: cancellationToken))
{
yield return await this.DownloadLeaseBlob(blob, cancellationToken);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we can definitely do that. I think the only disadvantage is that we'll execute the downloads serially instead of batching them like they are done today on the thread pool. However, given that the max partitioning is 16 (and is usually the default 4), this probably has a minimal performance impact that may be outweighed by its improved readability.
public virtual bool IsExpired => false; | ||
public virtual Task<bool> IsExpiredAsync(CancellationToken cancellationToken = default) | ||
{ | ||
return Task.FromResult(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tiny nit: tasks have non-negligible GC impact (depending how often this is called). I like to cache constant tasks like this in a readonly static.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in this case, the runtime is actually taking care of the cache given that the domain of possible boolean values is small (2). I just read about it here recently when learning about ValueTask
:
And since there are only two possible Boolean results (
true
andfalse
), there are only two possible Task objects needed to represent all possible result values, and so the runtime is able to cache two such objects and simply return a cached Task with a Result of true, avoiding the need to allocate. Only if the operation completes asynchronously does the method then need to allocate a new Task, because it needs to hand back the object to the caller before it knows what the result of the operation will be, and needs to have a unique object into which it can store the result when the operation does complete.
@@ -82,7 +81,7 @@ private static bool DefaultLeaseDecisionDelegate(string leaseId) | |||
public async Task InitializeAsync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a cancellation token to forward here? Doing such a good job adding them everywhere else 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah whoops! Good catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking at it, I think we may want to postpone this as I think I'd need to understand the implications of cancellation for the balancer, as it really only seems to start up and shut down 😅
using System.Threading.Tasks; | ||
using Azure; | ||
|
||
sealed class AsyncPageableAsyncProjection<TSource, TResult> : AsyncPageable<TResult> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this type needed/used? Curious what this does that System.Linq.Async
can't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see anything for this or the synchronous variant. That is mostly because I am trying to maintain the AsyncPageable<T>
API while also performing the projections and System.Linq.Async
doesn't know about this Azure-only Pageable API. Although I'm open to suggestions too!
using System.Threading.Tasks; | ||
using Azure; | ||
|
||
class TableQueryResponse<T> : IAsyncEnumerable<T> where T : notnull |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a new or existing metric?
{ | ||
OrchestrationState state = await this.GetStateAsync(instanceId, executionId: null); | ||
return state != null ? new InstanceStatus(state) : null; | ||
} | ||
|
||
/// <inheritdoc /> | ||
public override async Task<IList<OrchestrationState>> GetStateAsync(string instanceId, bool allExecutions, bool fetchInput = true) | ||
public override async IAsyncEnumerable<OrchestrationState> GetStateAsync(string instanceId, bool allExecutions, bool fetchInput = true, [EnumeratorCancellation] CancellationToken cancellationToken = default) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if this is recommended or not, but I tend to not expose the cancellation token publicly on async IAsyncEnumerable methods. Instead, I split into a public and private method:
public IAsyncEnumerable<T> MethodName(...args)
{
return MethodNameCore(args);
}
private async IAsyncEnumerable<T> MethodNameCore(...args, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// actual implementation
}
The reasoning is dotnet analyzers has a warning about always forwarding cancellation token. Which gets redundant with System.Linq.Async
APIs that take in cancellation and also IAsyncEnumerable.WithCancellation()
exists. Just a couple lines down that analyzer would flag and require you to write await this.GetStateAsync(instanceId, false, cancellationToken).FirstOrDefaultAsync(cancellationToken);
A bit redundant right? By hiding the cancellation token from the public method, we get rid of the redundancy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that makes perfect sense for conciseness. However, I am also seeing recent BCL additions that include an explicit token parameter, like in the ChannelReader
type that was introduced with .NET 6 that defines the method ReadAllAsync
. There is also DeserializeAsyncEnumerable
in the JsonSerializer
class. Although, there are not too many examples of IAsyncEnumerable<T>
yet in the BCL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those are good points and examples. Would be nice for some official guidance from .NET on this, or if they can somehow make the analyzer smart enough. Anyways - I am fine with whichever way you want to go here.
/// <summary> | ||
/// Represents an OData condition as specified by a filter and a projection. | ||
/// </summary> | ||
public readonly struct ODataCondition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have access to record structs? Would be a good opportunity here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use record struct
if we move the project to use <LangVersion>10.0</LangVersion>
. That's no big deal, but in this case I don't think we need the automatic equality comparisons. The nicest thing we could use would be the init
keyword, but we would need a shim to use that in .NET Standard and .NET Framework. This is actually possible! I do this in my own personal projects, but unsure if we want to include it here lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries about moving it. I was thinking the with
operator for records would be the nicer benefit. But yeah, we can worry about language changes another time.
src/DurableTask.AzureStorage/Tracking/OrchestrationInstanceStatusQueryCondition.cs
Outdated
Show resolved
Hide resolved
|
||
/// <summary> | ||
/// Get The Orchestration State for the Latest or All Executions | ||
/// </summary> | ||
/// <param name="instanceId">Instance Id</param> | ||
/// <param name="allExecutions">True if states for all executions are to be fetched otherwise only the state for the latest execution of the instance is fetched</param> | ||
/// <param name="fetchInput">If set, fetch and return the input for the orchestration instance.</param> | ||
Task<IList<OrchestrationState>> GetStateAsync(string instanceId, bool allExecutions, bool fetchInput); | ||
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param> | ||
IAsyncEnumerable<OrchestrationState> GetStateAsync(string instanceId, bool allExecutions, bool fetchInput, CancellationToken cancellationToken = default); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do the IAsyncEnumerable
returning methods have to declare CancellationToken
on their public contract? Callers can always use .WithCancellation()
. Curious if a precedence has been set by the dotnet runtime or any other core libraries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in the earlier comment, an explicit public cancellation token appears to be the growing pattern in the .NET BCLs and in the Azure libraries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 this looks great! Due to the sheer size and complexity of the changes, I think we should have a preview for the major version rev of DurableTask.AzureStorage
and get some testing/feedback on it.
/// This class makes heavy use of reflection to build the entity converters. | ||
/// </summary> | ||
/// <remarks> | ||
/// This class is safe for concurrent usage by multiple threads. | ||
/// This class is thread-safe. | ||
/// </remarks> | ||
class TableEntityConverter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hard to tell how correct this improvement is. If it is on par with the existing implementation for correctness and reliability, then I am fine with it. The perf improvements are great to see. However, I do wonder how "correct" it is, as all these types are data contracts, which has additional rules around serialization. For example, does this support IExtensibleDataObject
? Although that is not too important. AzureStorage (incorrectly) ignores IExtensibleDataObject
all throughout its usage of DurableTask.Core types.
/// <summary> | ||
/// Represents an OData condition as specified by a filter and a projection. | ||
/// </summary> | ||
public readonly struct ODataCondition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries about moving it. I was thinking the with
operator for records would be the nicer benefit. But yeah, we can worry about language changes another time.
{ | ||
OrchestrationState state = await this.GetStateAsync(instanceId, executionId: null); | ||
return state != null ? new InstanceStatus(state) : null; | ||
} | ||
|
||
/// <inheritdoc /> | ||
public override async Task<IList<OrchestrationState>> GetStateAsync(string instanceId, bool allExecutions, bool fetchInput = true) | ||
public override async IAsyncEnumerable<OrchestrationState> GetStateAsync(string instanceId, bool allExecutions, bool fetchInput = true, [EnumeratorCancellation] CancellationToken cancellationToken = default) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those are good points and examples. Would be nice for some official guidance from .NET on this, or if they can somehow make the analyzer smart enough. Anyways - I am fine with whichever way you want to go here.
@jviau, @cgillum, @davidmrdavid - I have added a |
@wsugarman can you let us know when you consider this ready for merge? It looks all good to me but wondering if you are still validation or working on any further tweaks. |
I've merged with the latest code in |
I've not used Durable Tasks before but I'd like to give this PR a try if possible as I'm dependent on v12 storage deps in my host (not using Azure Functions). Is the preview going to be available soon? |
FYI @wsugarman I've been using this PR and the hosting provided by @jviau with package reference modification to make use of it. I only build locally and published packages to a local nuget server to try out some things, namely can it help me with a real world use case of managing the creation of long running azure cloud resources. I am only using the azure storage for all DT processing and had no problems so far. I asked before but any ETA on a merge as I'd really like to use this? |
I suspect we will merge this in February. I was discussing with the Durable Task team before the holiday, but there is no firm ETA yet. I'll let the others provide a more authoritative timeline. |
@wsugarman Thanks for the update. Do you anticipate any further changes? I think for now I can make do with private feed packages for my own needs and swap over to official in feb (or whenever). |
/azp run |
EDIT: Run triggered successfully |
Fixes #516
This Pull Request is unfortunately rather long as it required replacing
WindowsAzure.Storage
with the three new libraries for the Azure Blob, Queue, and Table Storage service client SDKs. These SDKs continue to wrap the REST APIs for each of the respective services, but they rely upon the new primitives leveraged across all of the Azure SDKs for .NET. The changes can be summarized (so far) as follows:WindowsAzure.Storage
across the libraries and replace it with:Azure.Data.Tables
Azure.Storage.Blobs
Azure.Storage.Queues
string
->ETag?
DynamicTableEntity
->TableEntity
CloudQueueMessage
and Durable Task'sQueueMessage
with the built-inQueueMessage
CloudTable
->TableClient
CloudQueue
->QueueClient
CloudBlob
->BlobClient
TableQuery<T>
with raw OData queriesExpression<T>
trees as well, but for performance it probably makes sense to use the OData if it's not too difficult to read and validateTableEntityConverter
to use cached dynamic serialization and deserialization delegates usingExpression
treesStorageAccountDetails
withStorageAccountClientProvider
the encapsulates factories for blob, queue, and table service clientsTimeoutHandler
and some of theAzureStorageClient
with a series ofHttpPipelinePolicy
objects that run throughout the processing of an HTTP request; we rely on the clients own built-in retry as opposed to building our ownCancellationToken
parameters and update method signatures to useIAsyncEnumerable<T>
/AsyncPageable<T>
throughout the internal clients; consider adding to the public APIs (for now I added them for APIs that returnIAsyncEnumerable<T>
)