Skip to content

Commit

Permalink
ADO.NET Streaming Provider (#8974)
Browse files Browse the repository at this point in the history
* Added Orleans.Streaming.AdoNet project skeleton

* Added Streaming Artefacts

* Added Streaming Artefacts

* Added Orleans.Streaming.AdoNet project skeleton

* Added Streaming Artefacts

* Added Streaming Artefacts

* Added tests

* Added tests

* Added tests

* Added AdoNetQueueDataAdapter

* Added adonet streaming artefacts

* Added tests

* Added tests

* Added AdoNetQueueAdapter

* Added stream configurators

* Refactored

* Added AdoNetQueueAdapterFactory

* Added AdoNetStreamFailureHandler

* Added tests

* Added tests

* Added tests

* Refactored

* Added tests

* Added tests

* Added tests

* Added tests

* Refactored

* Added tests

* Added tests

* Added tests

* Added tests

* Added tests

* Added tests

* Refactored

* Refactored

* Refactored

* Refactored

* Configured defaults

* WIP Sweeping

* Refactored

* Reverted

* Refactored

* Refactored

* Added error handling

* Refactored

* Wired up options validator

* Fixed ArgumentOutOfRangeException thrown from PeriodicTimer

* Refactored

* Refactored

* Refactored

* Refactored

* Added tests

* Cleanup

* WIP Scripts

* Added MariaDB/MySQL scripts

* Added tests

* Upgraded Npgsql package with vulnerability warning.
Upgraded MySql.Data to fix connection pool deadlock: https://bugs.mysql.com/bug.php?id=114272
Upgraded Microsoft.NET.Test.Sdk to stable release version.

* Added tests

* Added tests

* Added tests

* Added tests

* Added tests

* Added tests

* Added tests

* Added tests

* Added tests

* Added tests

* Added safety timeout

* Fixed test

* Fixed "DROP DATABASE cannot be executed within a pipeline" issue for latest  PostgreSQL

* Added PostgreSQL scripts

* Added tests and refactored

* Refactored

* Refactored

* Fixed argument types

* Added check to flaky test

* Flaky test

* Fixed flaky tests

* Clearing pools in MySQL tests

* Removed READPAST to address flakiness

* Removed SKIP LOCKED to address flakiness

* Added benchmark

* Added benchmark

* Updated benchmark

* Updated benchmarks
  • Loading branch information
JorgeCandeias committed May 23, 2024
1 parent 71ea69e commit fd89211
Show file tree
Hide file tree
Showing 64 changed files with 6,499 additions and 46 deletions.
14 changes: 14 additions & 0 deletions Orleans.sln
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tester.Redis", "test\Extens
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Serialization.Protobuf", "src\Serializers\Orleans.Serialization.Protobuf\Orleans.Serialization.Protobuf.csproj", "{A073C0EE-8732-42F9-A22E-D47034E25076}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Streaming.AdoNet", "src\AdoNet\Orleans.Streaming.AdoNet\Orleans.Streaming.AdoNet.csproj", "{2B994F33-16CF-4679-936A-5AEABC529D2C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmarks.AdoNet", "test\Benchmarks.AdoNet\Benchmarks.AdoNet.csproj", "{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -581,6 +585,14 @@ Global
{A073C0EE-8732-42F9-A22E-D47034E25076}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.Build.0 = Release|Any CPU
{2B994F33-16CF-4679-936A-5AEABC529D2C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2B994F33-16CF-4679-936A-5AEABC529D2C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2B994F33-16CF-4679-936A-5AEABC529D2C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2B994F33-16CF-4679-936A-5AEABC529D2C}.Release|Any CPU.Build.0 = Release|Any CPU
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -688,6 +700,8 @@ Global
{8FC6457C-6273-4338-AD2A-ECAA8FE2C5D7} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
{F13247A0-70C9-4200-9CB1-2002CB8105E0} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
{A073C0EE-8732-42F9-A22E-D47034E25076} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
{2B994F33-16CF-4679-936A-5AEABC529D2C} = {EB2EDE59-5021-42EE-A97A-D59939B39C66}
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA} = {2CAB7894-777C-42B1-8B1E-322868CE92C7}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952}
Expand Down
105 changes: 105 additions & 0 deletions src/AdoNet/Orleans.Streaming.AdoNet/AdoNetBatchContainer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
namespace Orleans.Streaming.AdoNet;

/// <summary>
/// The <see cref="IBatchContainer"/> implementation for the ADONET provider.
/// </summary>
/// <remarks>
/// 1. This class only supports binary serialization as performance and data size is the priority for database storage.
/// 2. Though the <see cref="SequenceToken"/> is supported here, it is not yet used, as the ADO.NET provider is not rewindable.
/// </remarks>
[GenerateSerializer]
[Alias("Orleans.Streaming.AdoNet.AdoNetBatchContainer")]
internal class AdoNetBatchContainer : IBatchContainer
{
public AdoNetBatchContainer(StreamId streamId, List<object> events, Dictionary<string, object> requestContext)
{
ArgumentNullException.ThrowIfNull(events);

StreamId = streamId;
Events = events;
RequestContext = requestContext;
}

#region Serialized State

[Id(0)]
public StreamId StreamId { get; }

[Id(1)]
public List<object> Events { get; }

[Id(2)]
public Dictionary<string, object> RequestContext { get; }

[Id(3)]
public EventSequenceTokenV2 SequenceToken { get; internal set; } = null!;

/// <summary>
/// Holds the receipt for message confirmation.
/// </summary>
[Id(4)]
public int Dequeued { get; internal set; }

#endregion Serialized State

#region Interface

StreamSequenceToken IBatchContainer.SequenceToken => SequenceToken;

public IEnumerable<Tuple<T, StreamSequenceToken>> GetEvents<T>()
{
return SequenceToken is null
? throw new InvalidOperationException($"Cannot get events from a half-baked {nameof(AdoNetBatchContainer)}")
: Events
.OfType<T>()
.Select((e, i) => Tuple.Create<T, StreamSequenceToken>(e, SequenceToken.CreateSequenceTokenForEvent(i)));
}

public bool ImportRequestContext()
{
if (RequestContext is not null)
{
RequestContextExtensions.Import(RequestContext);
return true;
}

return false;
}

#endregion Interface

#region Conversion

/// <summary>
/// Creates a new <see cref="AdoNetBatchContainer"/> from the specified <see cref="AdoNetStreamMessage"/>.
/// </summary>
public static AdoNetBatchContainer FromMessage(Serializer<AdoNetBatchContainer> serializer, AdoNetStreamMessage message)
{
ArgumentNullException.ThrowIfNull(serializer);
ArgumentNullException.ThrowIfNull(message);

var container = serializer.Deserialize(message.Payload);
container.SequenceToken = new(message.MessageId);
container.Dequeued = message.Dequeued;

return container;
}

/// <summary>
/// Converts the specified <see cref="AdoNetBatchContainer"/> to a message payload.
/// </summary>
public static byte[] ToMessagePayload(Serializer<AdoNetBatchContainer> serializer, StreamId streamId, List<object> events, Dictionary<string, object> requestContext)
{
ArgumentNullException.ThrowIfNull(serializer);
ArgumentNullException.ThrowIfNull(events);

var container = new AdoNetBatchContainer(streamId, events, requestContext);
var payload = serializer.SerializeToArray(container);

return payload;
}

#endregion Conversion

public override string ToString() => $"[{nameof(AdoNetBatchContainer)}:Stream={StreamId},#Items={Events.Count}]";
}
71 changes: 71 additions & 0 deletions src/AdoNet/Orleans.Streaming.AdoNet/AdoNetQueueAdapter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
namespace Orleans.Streaming.AdoNet;

/// <summary>
/// Stream queue storage adapter for ADO.NET providers.
/// </summary>
internal partial class AdoNetQueueAdapter(string name, AdoNetStreamOptions streamOptions, ClusterOptions clusterOptions, SimpleQueueCacheOptions cacheOptions, AdoNetStreamQueueMapper mapper, RelationalOrleansQueries queries, Serializer<AdoNetBatchContainer> serializer, ILogger<AdoNetQueueAdapter> logger, IServiceProvider serviceProvider) : IQueueAdapter
{
private readonly ILogger<AdoNetQueueAdapter> _logger = logger;

/// <summary>
/// Maps to the ProviderId in the database.
/// </summary>
public string Name { get; } = name;

/// <summary>
/// The ADO.NET provider is not yet rewindable.
/// </summary>
public bool IsRewindable => false;

/// <summary>
/// The ADO.NET provider works both ways.
/// </summary>
public StreamProviderDirection Direction => StreamProviderDirection.ReadWrite;

public IQueueAdapterReceiver CreateReceiver(QueueId queueId)
{
// map the queue id
var adoNetQueueId = mapper.GetAdoNetQueueId(queueId);

// create the receiver
return ReceiverFactory(serviceProvider, [Name, adoNetQueueId, streamOptions, clusterOptions, cacheOptions, queries]);
}

public async Task QueueMessageBatchAsync<T>(StreamId streamId, IEnumerable<T> events, StreamSequenceToken token, Dictionary<string, object> requestContext)
{
// the ADO.NET provider is not rewindable so we do not support user supplied tokens
if (token is not null)
{
throw new ArgumentException($"{nameof(AdoNetQueueAdapter)} does not support a user supplied {nameof(StreamSequenceToken)}.");
}

// map the Orleans stream id to the corresponding queue id
var queueId = mapper.GetAdoNetQueueId(streamId);

// create the payload from the events
var payload = AdoNetBatchContainer.ToMessagePayload(serializer, streamId, events.Cast<object>().ToList(), requestContext);

// we can enqueue the message now
try
{
await queries.QueueStreamMessageAsync(clusterOptions.ServiceId, Name, queueId, payload, streamOptions.ExpiryTimeout.TotalSecondsCeiling());
}
catch (Exception ex)
{
LogFailedToQueueStreamMessage(ex, clusterOptions.ServiceId, Name, queueId);
throw;
}
}

/// <summary>
/// The receiver factory.
/// </summary>
private static readonly ObjectFactory<AdoNetQueueAdapterReceiver> ReceiverFactory = ActivatorUtilities.CreateFactory<AdoNetQueueAdapterReceiver>([typeof(string), typeof(string), typeof(AdoNetStreamOptions), typeof(ClusterOptions), typeof(SimpleQueueCacheOptions), typeof(RelationalOrleansQueries)]);

#region Logging

[LoggerMessage(1, LogLevel.Error, "Failed to queue stream message with ({ServiceId}, {ProviderId}, {QueueId})")]
private partial void LogFailedToQueueStreamMessage(Exception ex, string serviceId, string providerId, string queueId);

#endregion Logging
}
121 changes: 121 additions & 0 deletions src/AdoNet/Orleans.Streaming.AdoNet/AdoNetQueueAdapterFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using System.Threading;
using Microsoft.Extensions.Hosting;

namespace Orleans.Streaming.AdoNet;

internal class AdoNetQueueAdapterFactory : IQueueAdapterFactory
{
public AdoNetQueueAdapterFactory(string name, AdoNetStreamOptions streamOptions, ClusterOptions clusterOptions, SimpleQueueCacheOptions cacheOptions, HashRingStreamQueueMapperOptions hashOptions, ILoggerFactory loggerFactory, IHostApplicationLifetime lifetime, IServiceProvider serviceProvider)
{
_name = name;
_streamOptions = streamOptions;
_clusterOptions = clusterOptions;
_cacheOptions = cacheOptions;
_lifetime = lifetime;
_serviceProvider = serviceProvider;

_streamQueueMapper = new HashRingBasedStreamQueueMapper(hashOptions, name);
_cache = new SimpleQueueAdapterCache(cacheOptions, name, loggerFactory);
_adoNetQueueMapper = new AdoNetStreamQueueMapper(_streamQueueMapper);
}

private readonly string _name;
private readonly AdoNetStreamOptions _streamOptions;
private readonly ClusterOptions _clusterOptions;
private readonly SimpleQueueCacheOptions _cacheOptions;
private readonly IHostApplicationLifetime _lifetime;
private readonly IServiceProvider _serviceProvider;

private readonly HashRingBasedStreamQueueMapper _streamQueueMapper;
private readonly SimpleQueueAdapterCache _cache;
private readonly AdoNetStreamQueueMapper _adoNetQueueMapper;

private RelationalOrleansQueries _queries;

/// <summary>
/// Unfortunate implementation detail to account for lack of async lifetime.
/// Ideally this concern will be moved upstream so this won't be needed.
/// </summary>
private readonly SemaphoreSlim _semaphore = new(1);

/// <summary>
/// Ensures queries are loaded only once while allowing for recovery if the load fails.
/// </summary>
private ValueTask<RelationalOrleansQueries> GetQueriesAsync()
{
// attempt fast path
return _queries is not null ? new(_queries) : new(CoreAsync());

// slow path
async Task<RelationalOrleansQueries> CoreAsync()
{
await _semaphore.WaitAsync(_streamOptions.InitializationTimeout, _lifetime.ApplicationStopping);
try
{
// attempt fast path again
if (_queries is not null)
{
return _queries;
}

// slow path - the member variable will only be set if the call succeeds
return _queries = await RelationalOrleansQueries
.CreateInstance(_streamOptions.Invariant, _streamOptions.ConnectionString)
.WaitAsync(_streamOptions.InitializationTimeout);
}
finally
{
_semaphore.Release();
}
}
}

public async Task<IQueueAdapter> CreateAdapter()
{
var queries = await GetQueriesAsync();

return AdapterFactory(_serviceProvider, [_name, _streamOptions, _clusterOptions, _cacheOptions, _adoNetQueueMapper, queries]);
}

public async Task<IStreamFailureHandler> GetDeliveryFailureHandler(QueueId queueId)
{
var queries = await GetQueriesAsync();

return HandlerFactory(_serviceProvider, [false, _streamOptions, _clusterOptions, _adoNetQueueMapper, queries]);
}

public IQueueAdapterCache GetQueueAdapterCache() => _cache;

public IStreamQueueMapper GetStreamQueueMapper() => _streamQueueMapper;

/// <summary>
/// Used by the silo and client configurators as an entry point to set up a stream.
/// </summary>
public static IQueueAdapterFactory Create(IServiceProvider serviceProvider, string name)
{
ArgumentNullException.ThrowIfNull(serviceProvider);
ArgumentNullException.ThrowIfNull(name);

var streamOptions = serviceProvider.GetOptionsByName<AdoNetStreamOptions>(name);
var clusterOptions = serviceProvider.GetProviderClusterOptions(name).Value;
var cacheOptions = serviceProvider.GetOptionsByName<SimpleQueueCacheOptions>(name);
var hashOptions = serviceProvider.GetOptionsByName<HashRingStreamQueueMapperOptions>(name);

return QueueAdapterFactoryFactory(serviceProvider, [name, streamOptions, clusterOptions, cacheOptions, hashOptions]);
}

/// <summary>
/// Factory of <see cref="AdoNetQueueAdapterFactory"/> instances.
/// </summary>
private static readonly ObjectFactory<AdoNetQueueAdapterFactory> QueueAdapterFactoryFactory = ActivatorUtilities.CreateFactory<AdoNetQueueAdapterFactory>([typeof(string), typeof(AdoNetStreamOptions), typeof(ClusterOptions), typeof(SimpleQueueCacheOptions), typeof(HashRingStreamQueueMapperOptions)]);

/// <summary>
/// Factory of <see cref="AdoNetQueueAdapter"/> instances.
/// </summary>
private static readonly ObjectFactory<AdoNetQueueAdapter> AdapterFactory = ActivatorUtilities.CreateFactory<AdoNetQueueAdapter>([typeof(string), typeof(AdoNetStreamOptions), typeof(ClusterOptions), typeof(SimpleQueueCacheOptions), typeof(AdoNetStreamQueueMapper), typeof(RelationalOrleansQueries)]);

/// <summary>
/// Factory of <see cref="AdoNetStreamFailureHandler"/> instances.
/// </summary>
private static readonly ObjectFactory<AdoNetStreamFailureHandler> HandlerFactory = ActivatorUtilities.CreateFactory<AdoNetStreamFailureHandler>([typeof(bool), typeof(AdoNetStreamOptions), typeof(ClusterOptions), typeof(AdoNetStreamQueueMapper), typeof(RelationalOrleansQueries)]);
}
Loading

0 comments on commit fd89211

Please sign in to comment.