Skip to content

Commit

Permalink
ServiceBusSessionMessageActions management APIs: worker changes (#2404)
Browse files Browse the repository at this point in the history
* initial changes

* removing extra code

* more changes

* addressing comments

* adding unit tests

* updated package

* adding proj ref

* fixing unit test

* addressing comments

* changes

* changing error message
  • Loading branch information
aishwaryabh committed Jun 4, 2024
1 parent dd8398e commit be0b8b3
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
using System.Runtime.CompilerServices;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.15.1")]
[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.16.0")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Functions.Worker.Extensions.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")]
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";

// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic
option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc";
Expand All @@ -19,6 +20,21 @@ service Settlement {

// Defers a message
rpc Defer (DeferRequest) returns (google.protobuf.Empty) {}

// Renew message lock
rpc RenewMessageLock (RenewMessageLockRequest) returns (google.protobuf.Empty) {}

// Get session state
rpc GetSessionState (GetSessionStateRequest) returns (GetSessionStateResponse) {}

// Set session state
rpc SetSessionState (SetSessionStateRequest) returns (google.protobuf.Empty) {}

// Release session
rpc ReleaseSession (ReleaseSessionRequest) returns (google.protobuf.Empty) {}

// Renew session lock
rpc RenewSessionLock (RenewSessionLockRequest) returns (RenewSessionLockResponse) {}
}

// The complete message request containing the locktoken.
Expand All @@ -44,4 +60,40 @@ message DeadletterRequest {
message DeferRequest {
string locktoken = 1;
bytes propertiesToModify = 2;
}
}

// The renew message lock request containing the locktoken.
message RenewMessageLockRequest {
string locktoken = 1;
}

// The get message request.
message GetSessionStateRequest {
string sessionId = 1;
}

// The set message request.
message SetSessionStateRequest {
string sessionId = 1;
bytes sessionState = 2;
}

// Get response containing the session state.
message GetSessionStateResponse {
bytes sessionState = 1;
}

// Release session.
message ReleaseSessionRequest {
string sessionId = 1;
}

// Renew session lock.
message RenewSessionLockRequest {
string sessionId = 1;
}

// Renew session lock.
message RenewSessionLockResponse {
google.protobuf.Timestamp lockedUntil = 1;
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections;
using System.Collections.Generic;
Expand Down Expand Up @@ -148,6 +151,24 @@ public virtual async Task DeferMessageAsync(
await _settlement.DeferAsync(request, cancellationToken: cancellationToken);
}

///<inheritdoc cref="ServiceBusReceiver.RenewMessageLockAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task RenewMessageLockAsync(
ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}

var request = new RenewMessageLockRequest()
{
Locktoken = message.LockToken,
};

await _settlement.RenewMessageLockAsync(request, cancellationToken: cancellationToken);
}

internal static ByteString ConvertToByteString(IDictionary<string, object> propertiesToModify)
{
var map = new AmqpMap();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Google.Protobuf;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.ServiceBus.Grpc;

namespace Microsoft.Azure.Functions.Worker
{
/// <summary>
/// Converter to bind to <see cref="ServiceBusSessionMessageActions" /> type parameters.
/// </summary>
[InputConverter(typeof(ServiceBusSessionMessageActionsConverter))]
public class ServiceBusSessionMessageActions
{
private readonly Settlement.SettlementClient _settlement;
private readonly string _sessionId;

internal ServiceBusSessionMessageActions(Settlement.SettlementClient settlement, string sessionId, DateTimeOffset sessionLockedUntil)
{
_settlement = settlement ?? throw new ArgumentNullException(nameof(settlement));
_sessionId = sessionId ?? throw new ArgumentNullException(nameof(sessionId));
SessionLockedUntil = sessionLockedUntil;
}

/// <summary>
/// Initializes a new instance of the <see cref="ServiceBusMessageActions"/> class for mocking use in testing.
/// </summary>
/// <remarks>
/// This constructor exists only to support mocking. When used, class state is not fully initialized, and
/// will not function correctly; virtual members are meant to be mocked.
///</remarks>
protected ServiceBusSessionMessageActions()
{
_settlement = null!; // not expected to be used during mocking.
_sessionId = null!; // not expected to be used during mocking.
}

public virtual DateTimeOffset SessionLockedUntil { get; protected set; }

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task<BinaryData> GetSessionStateAsync(
CancellationToken cancellationToken = default)
{
var request = new GetSessionStateRequest()
{
SessionId = _sessionId,
};

GetSessionStateResponse result = await _settlement.GetSessionStateAsync(request, cancellationToken: cancellationToken);
BinaryData binaryData = new BinaryData(result.SessionState.Memory);
return binaryData;
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task SetSessionStateAsync(
BinaryData sessionState,
CancellationToken cancellationToken = default)
{
var request = new SetSessionStateRequest()
{
SessionId = _sessionId,
SessionState = ByteString.CopyFrom(sessionState.ToMemory().Span),
};

await _settlement.SetSessionStateAsync(request, cancellationToken: cancellationToken);
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task ReleaseSession(
CancellationToken cancellationToken = default)
{
var request = new ReleaseSessionRequest()
{
SessionId = _sessionId,
};

await _settlement.ReleaseSessionAsync(request, cancellationToken: cancellationToken);
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task RenewSessionLockAsync(
CancellationToken cancellationToken = default)
{
var request = new RenewSessionLockRequest()
{
SessionId = _sessionId,
};

var result = await _settlement.RenewSessionLockAsync(request, cancellationToken: cancellationToken);
SessionLockedUntil = result.LockedUntil.ToDateTimeOffset();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using Microsoft.Azure.ServiceBus.Grpc;
using System.Text.Json;

namespace Microsoft.Azure.Functions.Worker
{
/// <summary>
/// Converter to bind to <see cref="ServiceBusSessionMessageActions" /> or <see cref="ServiceBusSessionMessageActions{}" /> type parameters.
/// </summary>
[SupportsDeferredBinding]
[SupportedTargetType(typeof(ServiceBusSessionMessageActions))]
[SupportedTargetType(typeof(ServiceBusSessionMessageActions[]))]
internal class ServiceBusSessionMessageActionsConverter : IInputConverter
{
private readonly Settlement.SettlementClient _settlement;

public ServiceBusSessionMessageActionsConverter(Settlement.SettlementClient settlement)
{
_settlement = settlement;
}

public ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
{
try
{
var foundSessionId = context.FunctionContext.BindingContext.BindingData.TryGetValue("SessionId", out object? sessionId);
if (!foundSessionId)
{
throw new InvalidOperationException($"Expecting SessionId within binding data and value was not present. Sessions must be enabled when binding to {nameof(ServiceBusSessionMessageActions)}.");
}

// Get the sessionLockedUntil property from the SessionActions binding data
var foundSessionActions = context.FunctionContext.BindingContext.BindingData.TryGetValue("SessionActions", out object? sessionActions);
if (!foundSessionActions)
{
throw new InvalidOperationException("Expecting SessionActions within binding data and value was not present.");
}

JsonDocument jsonDocument = JsonDocument.Parse(sessionActions!.ToString());
var foundSessionLockedUntil = jsonDocument.RootElement.TryGetProperty("SessionLockedUntil", out JsonElement sessionLockedUntil);
if (!foundSessionLockedUntil)
{
throw new InvalidOperationException("Expecting SessionLockedUntil within binding data of session actions and value was not present.");
}

var sessionActionResult = new ServiceBusSessionMessageActions(_settlement, sessionId!.ToString(), sessionLockedUntil.GetDateTimeOffset());
var result = ConversionResult.Success(sessionActionResult);
return new ValueTask<ConversionResult>(result);
}
catch (Exception exception)
{
return new ValueTask<ConversionResult>(ConversionResult.Failed(exception));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ public void ServiceBus_SDKTypeBindings()

AssertDictionary(extensions, new Dictionary<string, string>
{
{ "Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.15.1" },
{ "Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.16.0" },
});

var serviceBusTriggerFunction = functions.Single(p => p.Name == nameof(SDKTypeBindings_ServiceBus.ServiceBusTriggerFunction));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,23 @@ public async Task CanDeferMessage()
await messageActions.DeferMessageAsync(message, properties);
}

[Fact]
public async Task CanRenewMessageLock()
{
var message = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: Guid.NewGuid());
var properties = new Dictionary<string, object>()
{
{ "int", 1 },
{ "string", "foo"},
{ "timespan", TimeSpan.FromSeconds(1) },
{ "datetime", DateTime.UtcNow },
{ "datetimeoffset", DateTimeOffset.UtcNow },
{ "guid", Guid.NewGuid() }
};
var messageActions = new ServiceBusMessageActions(new MockSettlementClient(message.LockToken, properties));
await messageActions.RenewMessageLockAsync(message);
}

[Fact]
public async Task PassingNullMessageThrows()
{
Expand All @@ -83,6 +100,7 @@ public async Task PassingNullMessageThrows()
await Assert.ThrowsAsync<ArgumentNullException>(async () => await messageActions.AbandonMessageAsync(null));
await Assert.ThrowsAsync<ArgumentNullException>(async () => await messageActions.DeadLetterMessageAsync(null));
await Assert.ThrowsAsync<ArgumentNullException>(async () => await messageActions.DeferMessageAsync(null));
await Assert.ThrowsAsync<ArgumentNullException>(async () => await messageActions.RenewMessageLockAsync(null));
}

private class MockSettlementClient : Settlement.SettlementClient
Expand Down Expand Up @@ -128,6 +146,12 @@ public override AsyncUnaryCall<Empty> DeferAsync(DeferRequest request, Metadata
Assert.Equal(_propertiesToModify, request.PropertiesToModify);
return new AsyncUnaryCall<Empty>(Task.FromResult(new Empty()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
}

public override AsyncUnaryCall<Empty> RenewMessageLockAsync(RenewMessageLockRequest request, CallOptions options)
{
Assert.Equal(_lockToken, request.Locktoken);
return new AsyncUnaryCall<Empty>(Task.FromResult(new Empty()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
}
}
}
}
}
Loading

0 comments on commit be0b8b3

Please sign in to comment.