Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventGrid blob trigger support #17137

Merged
merged 2 commits into from Jan 6, 2021
Merged

Conversation

alrod
Copy link
Member

@alrod alrod commented Nov 21, 2020

Current Blob trigger logic relies on pulling azure storage diagnostic logs. This approach gives us high latencies, bad performance and fragile behavior overall. Using EventGrid subscription is much more reliable approach and it will give approximately 2-3 times better performance.

The overall flow for EventGrid Blob trigger:

  1. A blob uploaded to azure storage.
  2. EventGrid subscription detects the blob change and sends a HTTP request to the blob extension HTTP endpoint.
  3. Blob extension receives the HTTP request, parses it and put a new message to the internal storage queue with the blob information and function name. Regular Blob trigger uses the same internal queue.
  4. Internal storage queue listener reads the new queue message, creates CloudBlob and executes the function code.

Steps to test locally:

  1. Create a function app with blob trigger in VS.
  2. Add new nuget source: https://www.myget.org/F/azure-appservice/api/v3/index.json
  3. Replace the storage extension package with Azure.WebJobs.Extensions.Storage.Blobs.5.0.0-alpha.20201120.1
  4. Add "UseEventGrid = true" to trigger definition
    public static void Run([BlobTrigger("samples-workitems/{name}", UseEventGrid = true)]Stream myBlob, string name, ILogger log)
  5. Start ngrok http -host-header=localhost 7071
    https://docs.microsoft.com/en-us/azure/azure-functions/functions-debug-event-grid-trigger-local
  6. Start the function app VS project.
  7. Open portal and create an EventGrid subscription with WebHook endpoint:
    https://83f2d199d6f4.ngrok.io/runtime/webhooks/blobs?functionName=Function1
    Note: EventGrid subscription fires on all containers/blobs in the storage account. Maybe you want to add some EventGrid subscription filters.
    Note: If you deploy the function app to azure the EventGrid subscription endpoint will be:
    https://[functionAppName].azurewebsites.net/runtime/webhooks/blobs?functionName=Function1&code=[blobExtensionKey]
    BlobExtensionKey is generated automatically and available on the azure portal.
  8. Add a blob to blob storage and verify the trigger is fired.

The plan is to provide the EventGrid Blob trigger in beta as is, write documentation and collect customers feedback.

/// <summary>
/// Returns a bool value that indicates whether EventGrid is used.
/// </summary>
public bool UseEventGrid { get; set; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to have different trigger kinds?

Is it worth to make this an enum?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.
Changefeed would be another strategy once is GA.
If we create enum then we should also name existing strategy/strategies and put there to give user list of possibilities.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum is added.


public async Task<HttpResponseMessage> ProcessHttpRequestAsync(HttpRequestMessage req, CancellationToken cancellationToken)
{
//Debugger.Break();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove here and elsewhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.


namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners
{
internal interface IHttpRequestProcessor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the interface?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface removed.

@@ -17,6 +17,12 @@
<Compile Include="$(MicrosoftAzureWebJobsExtensionsClientsSources)\**\*.cs" Link="Shared\%(RecursiveDir)\%(Filename)%(Extension)" />
</ItemGroup>

<ItemGroup>
<Compile Include="..\..\..\eventgrid\Microsoft.Azure.WebJobs.Extensions.EventGrid\src\TriggerBinding\HttpRequestProcessor.cs" Link="EventGrid\HttpRequestProcessor.cs" />
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might become a problem. We don't run storage tests when something in the eventgrid directory changes. So it's easy to unintentionally break something.

Copy link
Member Author

@alrod alrod Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What other options do we have here?
I see that this approach is used in other places:
https://github.com/Azure/azure-sdk-for-net/blob/7705656e645e5bc1e9afb07fe33f65a5a565da63/sdk/storage/Azure.Storage.Blobs/src/Azure.Storage.Blobs.csproj#L2

Eventually we will run storage test begore a release.

@pakrym
Copy link
Contributor

pakrym commented Nov 30, 2020

Can we add a functional test that simulates an event grid notification?


public async Task<HttpResponseMessage> ProcessHttpRequestAsync(HttpRequestMessage req, CancellationToken cancellationToken)
{
var functionId = HttpUtility.ParseQueryString(req.RequestUri.Query)["functionName"];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a partial(?) copy of the event grid implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to include sources in this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should be an extension for an extension. I.e. someone would need to pull extra package like Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.EventGrid in order to add extra functionality that spans across blobs and eventgrid. Some kind of plugin architecture.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is different for blob and event grid. For event grid we check if function with the name exists, for blob we will check this latter.

}

return new HttpResponseMessage(HttpStatusCode.BadRequest);
// FIXME without internal queuing, we are going to process all events in parallel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we file an issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will require some kind of internal queue(like for blob storage trigger) and redesign of EventGrid extension. Added an issue: #17756

Copy link
Contributor

@kasobol-msft kasobol-msft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Event Grid for blob changes has a limitation. It does not cover all blob types, i.e. it can cover either (block blobs and append blobs) or (block blocks and page blobs) - the documentation around it is somewhat confusing so we might need to check experimentally which pair is supported.

Anyway. It would be great to add some assertions to fail fast if customer is trying to bind to blobs we know they're not supported, i.e. they try to bind to SDK client that's not supported.
At least we should be verbose in docs to call out this limitation (probably around flag/enum).


public BlobsExtensionConfigProvider(
BlobServiceClientProvider blobServiceClientProvider,
BlobTriggerAttributeBindingProvider triggerBinder,
IContextGetter<IBlobWrittenWatcher> contextAccessor,
INameResolver nameResolver,
IConverterManager converterManager)
IConverterManager converterManager,
IHttpRequestProcessor httpEndpointManager)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd rather align naming between type and property.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add the check in BlobTriggerAttributeBindingProvider

@@ -99,7 +100,7 @@ public async Task<FunctionResult> ExecuteAsync(QueueMessage value, CancellationT
string possibleETag = blobProperties.ETag.ToString();

// If the blob still exists but the ETag is different, delete the message but do a fast path notification.
if (!string.Equals(message.ETag, possibleETag, StringComparison.Ordinal))
if (!string.Equals(message.ETag, possibleETag, StringComparison.Ordinal) && _blobWrittenWatcher != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this new condition really means that "UseEventGrid == true" ? I.e. is this derived from fact that we have not created _blobWrittenWatcher layers up because flag was true?

If that's the case then I'd suggest to inject the flag (or enum, see other discussion) everywhere we branch logic based on "blob change source". Might be easier to understand why logic branches like that instead of tracing dataflow each time we try to understand why we end up in given code branch.

If such exercise yields classes that look like if(event_grid){ } else { }. Then maybe these should be split and refactored into different abstraction to be more cohesive. I.e. I'd rather introduce new types or split existing types instead of compounding logical branches in the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I injected BlobTriggerKind to BlobQueueTriggerExecutor for better visibility.


namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners
{
internal class HttpRequestProcessor : IHttpRequestProcessor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sdk/storage/Azure.Storage.Webjobs.Extensions.Blobs/src/Listeners/HttpRequestProcessor.cs - it seems that this file is duplicated. I.e. one of them should be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

Comment on lines 85 to 92
private async Task InitializeWriterAsync(CancellationToken cancellationToken)
{
string hostId = await _hostIdProvider.GetHostIdAsync(cancellationToken).ConfigureAwait(false);
string hostBlobTriggerQueueName = HostQueueNames.GetHostBlobTriggerQueueName(hostId);
var hostBlobTriggerQueue = _queueServiceClientProvider.GetHost().GetQueueClient(hostBlobTriggerQueueName);

_blobTriggerQueueWriter = new BlobTriggerQueueWriter(hostBlobTriggerQueue, _sharedQueueWatcher);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we DI this? I.e. inject ready to use blobTriggerQueueWritter to this class? We might need a factory like

new BlobTriggerQueueWriter(hostBlobTriggerQueue, messageEnqueuedWatcher), _loggerFactory.CreateLogger<BlobListener>());
.

Resolving blob queue name is independent from the strategy we listen to blob changes, so there's an opportunity to reduce code duplication as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlobTriggerQueueWriterFactory was added.

Copy link
Contributor

@kasobol-msft kasobol-msft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see some files placed in directories that has been renamed at some point. Could you please go through the changes and make sure directory structure and namespaces are correct.

@check-enforcer
Copy link

This pull request is protected by Check Enforcer.

What is Check Enforcer?

Check Enforcer helps ensure all pull requests are covered by at least one check-run (typically an Azure Pipeline). When all check-runs associated with this pull request pass then Check Enforcer itself will pass.

Why am I getting this message?

You are getting this message because Check Enforcer did not detect any check-runs being associated with this pull request within five minutes. This may indicate that your pull request is not covered by any pipelines and so Check Enforcer is correctly blocking the pull request being merged.

What should I do now?

If the check-enforcer check-run is not passing and all other check-runs associated with this PR are passing (excluding license-cla) then you could try telling Check Enforcer to evaluate your pull request again. You can do this by adding a comment to this pull request as follows:
/check-enforcer evaluate
Typically evaulation only takes a few seconds. If you know that your pull request is not covered by a pipeline and this is expected you can override Check Enforcer using the following command:
/check-enforcer override
Note that using the override command triggers alerts so that follow-up investigations can occur (PRs still need to be approved as normal).

What if I am onboarding a new service?

Often, new services do not have validation pipelines associated with them, in order to bootstrap pipelines for a new service, you can issue the following command as a pull request comment:
/azp run prepare-pipelines
This will run a pipeline that analyzes the source tree and creates the pipelines necessary to build and validate your pull request. Once the pipeline has been created you can trigger the pipeline using the following comment:
/azp run net - [service] - ci

@alrod
Copy link
Member Author

alrod commented Jan 5, 2021

Can we add a functional test that simulates an event grid notification?

@pakrym,
I added functional tests in EventGridBlobTriggerEndToEndTests.

Copy link
Contributor

@kasobol-msft kasobol-msft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me overall.

token
};
}
return await HttpRequestProcessor.ProcessAsync(req, functionName, _logger, ProcessEventsAsync, CancellationToken.None).ConfigureAwait(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason why HttpRequestProcessor isn't a singleton and DI'ed?
I'd avoid using statics to pack functionality if possible.
From other angle. _logger would normally be a dependency for HttpRequestProcessor rather than a method parameter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, I added it in DI


if (_blobTriggerQueueWriter == null)
{
await InitializeWriterAsync(cancellationToken).ConfigureAwait(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case of race. is double init fine or should this be under lock ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file was removed

@@ -22,6 +22,12 @@ public sealed partial class BlobTriggerAttribute : System.Attribute, Microsoft.A
public BlobTriggerAttribute(string blobPath) { }
public string BlobPath { get { throw null; } }
public string Connection { get { throw null; } set { } }
public Microsoft.Azure.WebJobs.BlobTriggerKind Kind { get { throw null; } set { } }
}
public enum BlobTriggerKind
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about BlobTriggerSource / BlobChangeSource ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to BlobTriggerSource

public enum BlobTriggerKind
{
/// <summary>
/// Changes detection is relied on Storage Analytics logs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik the analytics scan is combined with container scan (list blobs). should we mention that ? Isn't this strategy falling back to container scan if logs are not there? (I'm not sure about this)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, fixed enum value name and the description.

var functionName = HttpUtility.ParseQueryString(input.RequestUri.Query)["functionName"];
if (_blobTriggerQueueWriter == null)
{
_blobTriggerQueueWriter = await _blobTriggerQueueWriterFactory.CreateAsync(cancellationToken).ConfigureAwait(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, is it fine if double init?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added code to ensure _blobTriggerQueueWriter is initialized only once using SlimSemaphore. Simple lock does not work as we have an async call and Lazy<> does not work as we need to pass cancellationtoken

Debug.Assert(watcher != null);
_watcher = watcher;
QueueClient = queueClient;
Debug.Assert(wsharedQueueWatcher != null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this DebugAssert stay ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

}

public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context)
{
ParameterInfo parameter = context.Parameter;
var blobTriggerAttribute = TypeUtility.GetResolvedAttribute<BlobTriggerAttribute>(context.Parameter);

if (parameter.ParameterType == typeof(PageBlobClient) && blobTriggerAttribute.Kind == BlobTriggerKind.EventGrid)
{
_logger.LogError("PageBlobClient is not supported with BlobTriggerKind.EventGrid");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you use nameof or something like that for BlobTriggerKind?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

private readonly BlobServiceClient _blobServiceClient;
private readonly RandomNameResolver _nameResolver;

private const string RegistrationRequest =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK to use hardcoded requests ?
These tests will run in https://dev.azure.com/azure-sdk/internal/_build?definitionId=410&_a=summary with Azure resources generated on the fly via ARM.
Easiest way to check is to create branch in main repo (not fork) and run that pipeline from branch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no point in creating eventgid subscription as an webjobs extension itself does not have the http endpoint. We need function host to add the http endpoint.

So best we can do is emulating http requests(passing hardcoded payload) in the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'm not exactly sure how this emulation works. Worst case we'll follow up if live tests fail.

@alrod alrod requested a review from pakrym January 6, 2021 08:45
{
private readonly ILogger _logger;

public HttpRequestProcessor(ILoggerFactory loggerFactory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public HttpRequestProcessor(ILoggerFactory loggerFactory)
public HttpRequestProcessor(ILogger<HttpRequestProcessor> logger)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need public HttpRequestProcessor(ILoggerFactory loggerFactory) constructor for unit tests

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can do NullLoggerFactory.Instance.CreateLogger<T>() in unit tests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -38,6 +38,9 @@ public sealed class BlobTriggerAttribute : Attribute, IConnectionProvider
{
private readonly string _blobPath;

// AnalyticsScan is default kind as it does not require additional actions to set up a blob trigger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// AnalyticsScan is default kind as it does not require additional actions to set up a blob trigger
// LogsAndContainerScan is default kind as it does not require additional actions to set up a blob trigger

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -164,6 +171,7 @@ public async Task<IListener> CreateAsync(CancellationToken cancellationToken)
};

sharedBlobQueueListener.Register(_functionDescriptor.Id, registration);
sharedBlobQueueListener.Register(_functionDescriptor.ShortName, registration);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to simplifying manual creation of the eventgrid callback uri.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth making this runtime change just to simplify the testing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for consistency with EventGrid extension:
https://{functionappname}.azurewebsites.net/runtime/webhooks/eventgrid?functionName={functionname}&code={systemkey} (https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-event-grid-trigger?tabs=csharp%2Cbash#version-2x-and-higher-runtime)
where {functionname} is short function name (without namespace)
So for blobs we want the same format
https://{functionappname}.azurewebsites.net/runtime/webhooks/blobs?functionName={functionname}&code={systemkey}

So we need the registration to find the function to execute by short name

private readonly ILogger<BlobListener> _logger;

public BlobQueueTriggerExecutor(IBlobWrittenWatcher blobWrittenWatcher, ILogger<BlobListener> logger)
: this(BlobCausalityReader.Instance, blobWrittenWatcher, logger)
public BlobQueueTriggerExecutor(BlobTriggerSource kind, IBlobWrittenWatcher blobWrittenWatcher, ILogger<BlobListener> logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public BlobQueueTriggerExecutor(BlobTriggerSource kind, IBlobWrittenWatcher blobWrittenWatcher, ILogger<BlobListener> logger)
public BlobQueueTriggerExecutor(BlobTriggerSource blobTriggerSource, IBlobWrittenWatcher blobWrittenWatcher, ILogger<BlobListener> logger)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -32,18 +32,24 @@ internal class EventGridExtensionConfigProvider : IExtensionConfigProvider,
private ILogger _logger;
private readonly ILoggerFactory _loggerFactory;
private readonly Func<EventGridAttribute, IAsyncCollector<EventGridEvent>> _converter;
private HttpRequestProcessor _httpRequestProcessor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this can be readonly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Comment on lines +145 to +150
await _semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_blobTriggerQueueWriter == null)
{
_blobTriggerQueueWriter = await _blobTriggerQueueWriterFactory.CreateAsync(cancellationToken).ConfigureAwait(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice.

Copy link
Contributor

@pakrym pakrym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pending comments.

@pakrym pakrym merged commit cdd844a into Azure:master Jan 6, 2021
annelo-msft pushed a commit to annelo-msft/azure-sdk-for-net that referenced this pull request Feb 17, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Event Grid Storage Storage Service (Queues, Blobs, Files)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants