Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ public async Task Process(Message message, ExecutionContext executionContext, IL
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

var messageContext = CreateMessageContext(message);
var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger);

await InitializeEndpointIfNecessary(functionExecutionContext,
messageContext.ReceiveCancellationTokenSource.Token).ConfigureAwait(false);
await InitializeEndpointIfNecessary(executionContext, functionsLogger, CancellationToken.None)
.ConfigureAwait(false);

try
{
Expand Down Expand Up @@ -78,12 +76,7 @@ MessageContext CreateMessageContext(Message originalMessage)
}
}

/// <summary>
/// Allows to forcefully initialize the endpoint if it hasn't been initialized yet.
/// </summary>
/// <param name="executionContext">The execution context.</param>
/// <param name="token">The cancellation token or default cancellation token.</param>
async Task InitializeEndpointIfNecessary(FunctionExecutionContext executionContext, CancellationToken token = default)
async Task InitializeEndpointIfNecessary(ExecutionContext executionContext, ILogger logger, CancellationToken token = default)
{
if (pipeline == null)
{
Expand All @@ -92,7 +85,8 @@ async Task InitializeEndpointIfNecessary(FunctionExecutionContext executionConte
{
if (pipeline == null)
{
endpoint = await endpointFactory(executionContext).ConfigureAwait(false);
var functionExecutionContext = new FunctionExecutionContext(executionContext, logger);
endpoint = await endpointFactory(functionExecutionContext).ConfigureAwait(false);

pipeline = configuration.PipelineInvoker;
}
Expand All @@ -107,8 +101,9 @@ async Task InitializeEndpointIfNecessary(FunctionExecutionContext executionConte
/// <inheritdoc />
public async Task Send(object message, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

await InitializeEndpointIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
await endpoint.Send(message, options).ConfigureAwait(false);
}

Expand All @@ -121,8 +116,9 @@ public Task Send(object message, ExecutionContext executionContext, ILogger func
/// <inheritdoc />
public async Task Send<T>(Action<T> messageConstructor, SendOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

await InitializeEndpointIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
await endpoint.Send(messageConstructor, options).ConfigureAwait(false);
}

Expand All @@ -135,75 +131,54 @@ public Task Send<T>(Action<T> messageConstructor, ExecutionContext executionCont
/// <inheritdoc />
public async Task Publish(object message, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

await InitializeEndpointIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
await endpoint.Publish(message, options).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Publish<T>(Action<T> messageConstructor, PublishOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

await InitializeEndpointIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
await endpoint.Publish(messageConstructor, options).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Publish(object message, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Publish(message).ConfigureAwait(false);
}
public Task Publish(object message, ExecutionContext executionContext, ILogger functionsLogger = null) =>
Publish(message, new PublishOptions(), executionContext, functionsLogger);

/// <inheritdoc />
public async Task Publish<T>(Action<T> messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Publish(messageConstructor).ConfigureAwait(false);
}
public Task Publish<T>(Action<T> messageConstructor, ExecutionContext executionContext, ILogger functionsLogger = null) =>
Publish(messageConstructor, new PublishOptions(), executionContext, functionsLogger);

/// <inheritdoc />
public async Task Subscribe(Type eventType, SubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

await InitializeEndpointIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
await endpoint.Subscribe(eventType, options).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Subscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Subscribe(eventType).ConfigureAwait(false);
}
public Task Subscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null) =>
Subscribe(eventType, new SubscribeOptions(), executionContext, functionsLogger);

/// <inheritdoc />
public async Task Unsubscribe(Type eventType, UnsubscribeOptions options, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

await InitializeEndpointIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);
await endpoint.Unsubscribe(eventType, options).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Unsubscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null)
{
await InitializeEndpointUsedOutsideHandlerIfNecessary(executionContext, functionsLogger).ConfigureAwait(false);

await endpoint.Unsubscribe(eventType).ConfigureAwait(false);
}

async Task InitializeEndpointUsedOutsideHandlerIfNecessary(ExecutionContext executionContext, ILogger functionsLogger)
{
FunctionsLoggerFactory.Instance.SetCurrentLogger(functionsLogger);

var functionExecutionContext = new FunctionExecutionContext(executionContext, functionsLogger);

await InitializeEndpointIfNecessary(functionExecutionContext).ConfigureAwait(false);
}
public Task Unsubscribe(Type eventType, ExecutionContext executionContext, ILogger functionsLogger = null) =>
Unsubscribe(eventType, new UnsubscribeOptions(), executionContext, functionsLogger);

internal static void LoadAssemblies(string assemblyDirectory)
{
Expand Down