Skip to content

Commit

Permalink
Implementing selective loading for stein (#8900)
Browse files Browse the repository at this point in the history
  • Loading branch information
soninaren committed Nov 24, 2022
1 parent 02b161d commit 172c57f
Show file tree
Hide file tree
Showing 27 changed files with 522 additions and 592 deletions.
3 changes: 3 additions & 0 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Expand Up @@ -676,6 +676,9 @@ public Task<List<RawFunctionMetadata>> GetFunctionMetadata()

internal Task<List<RawFunctionMetadata>> SendFunctionMetadataRequest()
{
// reset indexing task when in case we need to send another request
_functionsIndexingTask = new TaskCompletionSource<List<RawFunctionMetadata>>(TaskCreationOptions.RunContinuationsAsynchronously);

RegisterCallbackForNextGrpcMessage(MsgType.FunctionMetadataResponse, _functionLoadTimeout, 1,
msg => ProcessFunctionMetadataResponses(msg.Message.FunctionMetadataResponse), HandleWorkerMetadataRequestError);

Expand Down
Expand Up @@ -126,7 +126,6 @@ public static void AddWebJobsScriptHost(this IServiceCollection services, IConfi
// Management services
services.AddSingleton<IFunctionsSyncManager, FunctionsSyncManager>();
services.AddSingleton<IFunctionMetadataManager, FunctionMetadataManager>();
services.AddSingleton<IFunctionMetadataProvider, HostFunctionMetadataProvider>();
services.AddSingleton<IWebFunctionsManager, WebFunctionsManager>();
services.AddSingleton<IInstanceManager, InstanceManager>();
services.AddHttpClient();
Expand Down Expand Up @@ -160,6 +159,14 @@ public static void AddWebJobsScriptHost(this IServiceCollection services, IConfi
// Language Worker Hosted Services need to be intialized before WebJobsScriptHostService
ScriptHostBuilderExtensions.AddCommonServices(services);

services.AddSingleton<IFunctionMetadataProvider>(sp =>
{
return new FunctionMetadataProvider(
sp.GetRequiredService<ILogger<FunctionMetadataProvider>>(),
ActivatorUtilities.CreateInstance<WorkerFunctionMetadataProvider>(sp),
ActivatorUtilities.CreateInstance<HostFunctionMetadataProvider>(sp));
});

// Core script host services
services.AddSingleton<WebJobsScriptHostService>();
services.AddSingleton<IHostedService>(s => s.GetRequiredService<WebJobsScriptHostService>());
Expand Down
Expand Up @@ -82,11 +82,9 @@ public async Task<IEnumerable<Type>> GetExtensionsStartupTypesAsync()
bool isLegacyExtensionBundle = _extensionBundleManager.IsLegacyExtensionBundle();
bool isPrecompiledFunctionApp = false;

// if workerIndexing
// Function.json (httpTrigger, blobTrigger, blobTrigger) -> httpTrigger, blobTrigger
// dotnet app precompiled -> Do not use bundles
var workerConfigs = _languageWorkerOptions.CurrentValue.WorkerConfigs;
if (bundleConfigured && !Utility.CanWorkerIndex(workerConfigs, SystemEnvironment.Instance))
if (bundleConfigured)
{
ExtensionBundleDetails bundleDetails = await _extensionBundleManager.GetExtensionBundleDetails();
ValidateBundleRequirements(bundleDetails);
Expand Down Expand Up @@ -180,8 +178,7 @@ public async Task<IEnumerable<Type>> GetExtensionsStartupTypesAsync()
continue;
}

if (Utility.CanWorkerIndex(workerConfigs, SystemEnvironment.Instance)
|| !bundleConfigured
if (!bundleConfigured
|| extensionItem.Bindings.Count == 0
|| extensionItem.Bindings.Intersect(bindingsSet, StringComparer.OrdinalIgnoreCase).Any())
{
Expand Down
9 changes: 4 additions & 5 deletions src/WebJobs.Script/Host/FunctionMetadataManager.cs
Expand Up @@ -14,6 +14,7 @@
using Microsoft.Azure.WebJobs.Script.Workers.Http;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

Expand Down Expand Up @@ -81,11 +82,11 @@ public bool TryGetFunctionMetadata(string functionName, out FunctionMetadata fun
/// <param name="applyAllowList">Apply functions allow list filter.</param>
/// <param name="includeCustomProviders">Include any metadata provided by IFunctionProvider when loading the metadata</param>
/// <returns> An Immmutable array of FunctionMetadata.</returns>
public ImmutableArray<FunctionMetadata> GetFunctionMetadata(bool forceRefresh, bool applyAllowList = true, bool includeCustomProviders = true, IFunctionInvocationDispatcher dispatcher = null)
public ImmutableArray<FunctionMetadata> GetFunctionMetadata(bool forceRefresh, bool applyAllowList = true, bool includeCustomProviders = true)
{
if (forceRefresh || _servicesReset || _functionMetadataArray.IsDefaultOrEmpty)
{
_functionMetadataArray = LoadFunctionMetadata(forceRefresh, includeCustomProviders, dispatcher);
_functionMetadataArray = LoadFunctionMetadata(forceRefresh, includeCustomProviders);
_logger.FunctionMetadataManagerFunctionsLoaded(ApplyAllowList(_functionMetadataArray).Count());
_servicesReset = false;
}
Expand Down Expand Up @@ -132,9 +133,7 @@ internal ImmutableArray<FunctionMetadata> LoadFunctionMetadata(bool forceRefresh
ImmutableArray<FunctionMetadata> immutableFunctionMetadata;
var workerConfigs = _languageWorkerOptions.CurrentValue.WorkerConfigs;

IFunctionMetadataProvider metadataProvider = new AggregateFunctionMetadataProvider(_loggerFactory.CreateLogger<AggregateFunctionMetadataProvider>(), dispatcher, _functionMetadataProvider, _scriptOptions);

immutableFunctionMetadata = metadataProvider.GetFunctionMetadataAsync(workerConfigs, SystemEnvironment.Instance, forceRefresh).GetAwaiter().GetResult();
immutableFunctionMetadata = _functionMetadataProvider.GetFunctionMetadataAsync(workerConfigs, SystemEnvironment.Instance, forceRefresh).GetAwaiter().GetResult();

var functionMetadataList = new List<FunctionMetadata>();
_functionErrors = new Dictionary<string, ICollection<string>>();
Expand Down
61 changes: 61 additions & 0 deletions src/WebJobs.Script/Host/FunctionMetadataProvider.cs
@@ -0,0 +1,61 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Script.Description;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Microsoft.Azure.WebJobs.Script.WebHost
{
internal class FunctionMetadataProvider : IFunctionMetadataProvider
{
private readonly IEnvironment _environment;
private readonly ILogger<FunctionMetadataProvider> _logger;
private IWorkerFunctionMetadataProvider _workerFunctionMetadataProvider;
private IHostFunctionMetadataProvider _hostFunctionMetadataProvider;

public FunctionMetadataProvider(ILogger<FunctionMetadataProvider> logger, IWorkerFunctionMetadataProvider workerFunctionMetadataProvider, IHostFunctionMetadataProvider hostFunctionMetadataProvider)
{
_logger = logger;
_workerFunctionMetadataProvider = workerFunctionMetadataProvider;
_hostFunctionMetadataProvider = hostFunctionMetadataProvider;
_environment = SystemEnvironment.Instance;
}

public ImmutableDictionary<string, ImmutableArray<string>> FunctionErrors { get; private set; }

public async Task<ImmutableArray<FunctionMetadata>> GetFunctionMetadataAsync(IEnumerable<RpcWorkerConfig> workerConfigs, IEnvironment environment, bool forceRefresh = false)
{
bool workerIndexing = Utility.CanWorkerIndex(workerConfigs, _environment);
if (!workerIndexing)
{
return await GetMetadataFromHostProvider(workerConfigs, environment, forceRefresh);
}

_logger.LogInformation("Worker indexing is enabled");

FunctionMetadataResult functionMetadataResult = await _workerFunctionMetadataProvider?.GetFunctionMetadataAsync(workerConfigs, SystemEnvironment.Instance, forceRefresh);
FunctionErrors = _workerFunctionMetadataProvider.FunctionErrors;

if (functionMetadataResult.UseDefaultMetadataIndexing)
{
_logger.LogDebug("Fallback to host indexing as worker denied indexing");
return await GetMetadataFromHostProvider(workerConfigs, environment, forceRefresh);
}

return functionMetadataResult.Functions;
}

private async Task<ImmutableArray<FunctionMetadata>> GetMetadataFromHostProvider(IEnumerable<RpcWorkerConfig> workerConfigs, IEnvironment environment, bool forceRefresh = false)
{
var functions = await _hostFunctionMetadataProvider?.GetFunctionMetadataAsync(workerConfigs, environment, forceRefresh);
FunctionErrors = _hostFunctionMetadataProvider.FunctionErrors;
return functions;
}
}
}
21 changes: 21 additions & 0 deletions src/WebJobs.Script/Host/FunctionMetadataResult.cs
@@ -0,0 +1,21 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Immutable;
using Microsoft.Azure.WebJobs.Script.Description;

namespace Microsoft.Azure.WebJobs.Script
{
internal class FunctionMetadataResult
{
public FunctionMetadataResult(bool useDefaultMetadataIndexing, ImmutableArray<FunctionMetadata> functions)
{
this.UseDefaultMetadataIndexing = useDefaultMetadataIndexing;
this.Functions = functions;
}

public bool UseDefaultMetadataIndexing { get; private set; }

public ImmutableArray<FunctionMetadata> Functions { get; private set; }
}
}
2 changes: 1 addition & 1 deletion src/WebJobs.Script/Host/HostFunctionMetadataProvider.cs
Expand Up @@ -20,7 +20,7 @@

namespace Microsoft.Azure.WebJobs.Script
{
public class HostFunctionMetadataProvider : IFunctionMetadataProvider
internal class HostFunctionMetadataProvider : IHostFunctionMetadataProvider
{
private readonly IOptionsMonitor<ScriptApplicationHostOptions> _applicationHostOptions;
private readonly IMetricsLogger _metricsLogger;
Expand Down
2 changes: 1 addition & 1 deletion src/WebJobs.Script/Host/IFunctionMetadataManager.cs
Expand Up @@ -11,7 +11,7 @@ public interface IFunctionMetadataManager
{
ImmutableDictionary<string, ImmutableArray<string>> Errors { get; }

ImmutableArray<FunctionMetadata> GetFunctionMetadata(bool forceRefresh = false, bool applyAllowlist = true, bool includeCustomProviders = true, IFunctionInvocationDispatcher dispatcher = null);
ImmutableArray<FunctionMetadata> GetFunctionMetadata(bool forceRefresh = false, bool applyAllowlist = true, bool includeCustomProviders = true);

bool TryGetFunctionMetadata(string functionName, out FunctionMetadata functionMetadata, bool forceRefresh = false);
}
Expand Down
24 changes: 24 additions & 0 deletions src/WebJobs.Script/Host/IHostFunctionMetadataProvider.cs
@@ -0,0 +1,24 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Script.Description;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;

namespace Microsoft.Azure.WebJobs.Script
{
/// <summary>
/// Defines an interface for fetching function metadata from function.json files
/// </summary>
internal interface IHostFunctionMetadataProvider
{
ImmutableDictionary<string, ImmutableArray<string>> FunctionErrors { get; }

/// <summary>
/// Reads function metadata from function.json files present along with each function
/// </summary>
Task<ImmutableArray<FunctionMetadata>> GetFunctionMetadataAsync(IEnumerable<RpcWorkerConfig> workerConfigs, IEnvironment environment, bool forceRefresh = false);
}
}
24 changes: 24 additions & 0 deletions src/WebJobs.Script/Host/IWorkerFunctionMetadataProvider.cs
@@ -0,0 +1,24 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;

namespace Microsoft.Azure.WebJobs.Script
{
/// <summary>
/// Defines an interface for fetching function metadata from Out-of-Proc language workers
/// </summary>
internal interface IWorkerFunctionMetadataProvider
{
ImmutableDictionary<string, ImmutableArray<string>> FunctionErrors { get; }

/// <summary>
/// Attempts to get function metadata from Out-of-Proc language workers
/// </summary>
/// <returns>FunctionMetadataResult that either contains the function metadata or indicates that a fall back option for fetching metadata should be used</returns>
Task<FunctionMetadataResult> GetFunctionMetadataAsync(IEnumerable<RpcWorkerConfig> workerConfigs, IEnvironment environment, bool forceRefresh = false);
}
}
29 changes: 7 additions & 22 deletions src/WebJobs.Script/Host/ScriptHost.cs
Expand Up @@ -282,10 +282,8 @@ public async Task InitializeAsync(CancellationToken cancellationToken = default)
// get worker config information and check to see if worker should index or not
var workerConfigs = _languageWorkerOptions.CurrentValue.WorkerConfigs;

bool workerIndexing = Utility.CanWorkerIndex(workerConfigs, _environment);

// Generate Functions
IEnumerable<FunctionMetadata> functionMetadataList = GetFunctionsMetadata(workerIndexing);
IEnumerable<FunctionMetadata> functionMetadataList = GetFunctionsMetadata();

if (!_environment.IsPlaceholderModeEnabled())
{
Expand Down Expand Up @@ -313,14 +311,8 @@ public async Task InitializeAsync(CancellationToken cancellationToken = default)

await InitializeFunctionDescriptorsAsync(functionMetadataList, cancellationToken);

if (!workerIndexing)
{
// Initialize worker function invocation dispatcher only for valid functions after creating function descriptors
// Dispatcher not needed for codeless function.
// Dispatcher needed for non-dotnet codeless functions
var filteredFunctionMetadata = functionMetadataList.Where(m => m.IsProxy() || !Utility.IsCodelessDotNetLanguageFunction(m));
await _functionDispatcher.InitializeAsync(Utility.GetValidFunctions(filteredFunctionMetadata, Functions), cancellationToken);
}
var filteredFunctionMetadata = functionMetadataList.Where(m => m.IsProxy() || !Utility.IsCodelessDotNetLanguageFunction(m));
await _functionDispatcher.InitializeAsync(Utility.GetValidFunctions(filteredFunctionMetadata, Functions), cancellationToken);

GenerateFunctions();
ScheduleFileSystemCleanup();
Expand Down Expand Up @@ -371,19 +363,12 @@ private void LogHostFunctionErrors()
/// Gets metadata collection of functions and proxies configured.
/// </summary>
/// <returns>A metadata collection of functions and proxies configured.</returns>
private IEnumerable<FunctionMetadata> GetFunctionsMetadata(bool workerIndexing)
private IEnumerable<FunctionMetadata> GetFunctionsMetadata()
{
IEnumerable<FunctionMetadata> functionMetadata;
if (workerIndexing)
{
_logger.LogInformation("Worker indexing is enabled");
functionMetadata = _functionMetadataManager.GetFunctionMetadata(forceRefresh: false, dispatcher: _functionDispatcher);
}
else
{
functionMetadata = _functionMetadataManager.GetFunctionMetadata(false);
_workerRuntime = _workerRuntime ?? Utility.GetWorkerRuntime(functionMetadata);
}

functionMetadata = _functionMetadataManager.GetFunctionMetadata(forceRefresh: false);
_workerRuntime ??= Utility.GetWorkerRuntime(functionMetadata);
foreach (var error in _functionMetadataManager.Errors)
{
FunctionErrors.Add(error.Key, error.Value.ToArray());
Expand Down

0 comments on commit 172c57f

Please sign in to comment.