Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADO.NET Streaming Provider #8974

Merged
merged 91 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
a9d4099
Added Orleans.Streaming.AdoNet project skeleton
JorgeCandeias Apr 9, 2024
898cdeb
Added Streaming Artefacts
JorgeCandeias Apr 12, 2024
279afef
Added Streaming Artefacts
JorgeCandeias Apr 17, 2024
6c7eec9
Added Orleans.Streaming.AdoNet project skeleton
JorgeCandeias Apr 9, 2024
3655189
Added Streaming Artefacts
JorgeCandeias Apr 12, 2024
f22393d
Added Streaming Artefacts
JorgeCandeias Apr 17, 2024
dc4c58d
Merge branch 'adonet-streaming' of https://github.com/JorgeCandeias/o…
JorgeCandeias Apr 17, 2024
6d6db56
Added tests
JorgeCandeias Apr 22, 2024
5a79b2d
Added tests
JorgeCandeias Apr 22, 2024
0d4993d
Added tests
JorgeCandeias Apr 22, 2024
7dc1991
Added AdoNetQueueDataAdapter
JorgeCandeias Apr 22, 2024
c47d991
Added adonet streaming artefacts
JorgeCandeias Apr 23, 2024
2831159
Added tests
JorgeCandeias Apr 23, 2024
87de6cb
Added tests
JorgeCandeias Apr 24, 2024
1036f37
Added AdoNetQueueAdapter
JorgeCandeias Apr 24, 2024
fc5e7a9
Added stream configurators
JorgeCandeias Apr 24, 2024
54cfaf0
Refactored
JorgeCandeias Apr 24, 2024
5d9e7c7
Added AdoNetQueueAdapterFactory
JorgeCandeias Apr 24, 2024
33a5c0c
Added AdoNetStreamFailureHandler
JorgeCandeias Apr 26, 2024
2278a90
Added tests
JorgeCandeias Apr 26, 2024
0d82568
Added tests
JorgeCandeias Apr 27, 2024
7dc6ac8
Added tests
JorgeCandeias Apr 27, 2024
8a55d37
Refactored
JorgeCandeias Apr 27, 2024
15e4d5d
Added tests
JorgeCandeias Apr 27, 2024
6f81c15
Added tests
JorgeCandeias Apr 27, 2024
9f82675
Added tests
JorgeCandeias Apr 27, 2024
98fa7a0
Added tests
JorgeCandeias Apr 27, 2024
8c7df7f
Merge branch 'adonet-streaming' of https://github.com/JorgeCandeias/o…
JorgeCandeias Apr 27, 2024
0c74a6c
Refactored
JorgeCandeias Apr 27, 2024
7e4be58
Added tests
JorgeCandeias Apr 27, 2024
9283674
Added tests
JorgeCandeias Apr 27, 2024
905d1b7
Added tests
JorgeCandeias Apr 27, 2024
fe272bc
Added tests
JorgeCandeias Apr 28, 2024
a6da73c
Added tests
JorgeCandeias Apr 28, 2024
4c6cc94
Added tests
JorgeCandeias Apr 28, 2024
19c0837
Refactored
JorgeCandeias Apr 28, 2024
884d780
Refactored
JorgeCandeias Apr 28, 2024
4df4583
Refactored
JorgeCandeias Apr 29, 2024
91b625e
Refactored
JorgeCandeias Apr 29, 2024
69ce629
Configured defaults
JorgeCandeias Apr 29, 2024
9f81641
WIP Sweeping
JorgeCandeias May 1, 2024
5a39c1e
Refactored
JorgeCandeias May 4, 2024
f1d2d2b
Merge remote-tracking branch 'origin/main' into adonet-streaming
JorgeCandeias May 4, 2024
3eda841
Reverted
JorgeCandeias May 4, 2024
62a8566
Refactored
JorgeCandeias May 4, 2024
977f840
Refactored
JorgeCandeias May 4, 2024
195dce4
Added error handling
JorgeCandeias May 4, 2024
05f3edb
Refactored
JorgeCandeias May 4, 2024
fc58672
Wired up options validator
JorgeCandeias May 4, 2024
863294f
Fixed ArgumentOutOfRangeException thrown from PeriodicTimer
JorgeCandeias May 4, 2024
82d5bab
Refactored
JorgeCandeias May 5, 2024
0b0ab5e
Refactored
JorgeCandeias May 5, 2024
f90e492
Refactored
JorgeCandeias May 5, 2024
826a63a
Refactored
JorgeCandeias May 5, 2024
ad47460
Added tests
JorgeCandeias May 5, 2024
055a80a
Cleanup
JorgeCandeias May 5, 2024
cffbe78
WIP Scripts
JorgeCandeias May 7, 2024
db26cde
Added MariaDB/MySQL scripts
JorgeCandeias May 9, 2024
374800f
Added tests
JorgeCandeias May 9, 2024
d5a2682
Upgraded Npgsql package with vulnerability warning.
JorgeCandeias May 10, 2024
5be82be
Added tests
JorgeCandeias May 11, 2024
44ee55a
Added tests
JorgeCandeias May 11, 2024
458943e
Added tests
JorgeCandeias May 11, 2024
e8ec712
Added tests
JorgeCandeias May 11, 2024
295f35e
Added tests
JorgeCandeias May 11, 2024
345affc
Added tests
JorgeCandeias May 11, 2024
2fe0a4b
Added tests
JorgeCandeias May 11, 2024
17e9772
Added tests
JorgeCandeias May 11, 2024
fb5e43e
Added tests
JorgeCandeias May 11, 2024
69b5e70
Added tests
JorgeCandeias May 11, 2024
080510b
Added safety timeout
JorgeCandeias May 11, 2024
317e83d
Fixed test
JorgeCandeias May 11, 2024
ce0f634
Merged
JorgeCandeias May 11, 2024
d8c0c68
Fixed "DROP DATABASE cannot be executed within a pipeline" issue for …
JorgeCandeias May 11, 2024
b08b380
Added PostgreSQL scripts
JorgeCandeias May 12, 2024
002d968
Added tests and refactored
JorgeCandeias May 12, 2024
0fb01d5
Refactored
JorgeCandeias May 12, 2024
57dfbb1
Refactored
JorgeCandeias May 12, 2024
d112e17
Fixed argument types
JorgeCandeias May 13, 2024
3b0a282
Added check to flaky test
JorgeCandeias May 16, 2024
8ffa5ad
Flaky test
JorgeCandeias May 16, 2024
81c0b7f
Fixed flaky tests
JorgeCandeias May 16, 2024
c917e3b
Merged
JorgeCandeias May 17, 2024
6b5c661
Clearing pools in MySQL tests
JorgeCandeias May 17, 2024
ac8ade7
Removed READPAST to address flakiness
JorgeCandeias May 17, 2024
c09b433
Removed SKIP LOCKED to address flakiness
JorgeCandeias May 17, 2024
ce2a0e5
Added benchmark
JorgeCandeias May 19, 2024
9c6273f
Merge branch 'adonet-streaming' of https://github.com/JorgeCandeias/o…
JorgeCandeias May 19, 2024
62dc0a4
Added benchmark
JorgeCandeias May 19, 2024
689fee1
Updated benchmark
JorgeCandeias May 19, 2024
bfaa264
Updated benchmarks
JorgeCandeias May 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Orleans.sln
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tester.Redis", "test\Extens
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Serialization.Protobuf", "src\Serializers\Orleans.Serialization.Protobuf\Orleans.Serialization.Protobuf.csproj", "{A073C0EE-8732-42F9-A22E-D47034E25076}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Orleans.Streaming.AdoNet", "src\AdoNet\Orleans.Streaming.AdoNet\Orleans.Streaming.AdoNet.csproj", "{2B994F33-16CF-4679-936A-5AEABC529D2C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmarks.AdoNet", "test\Benchmarks.AdoNet\Benchmarks.AdoNet.csproj", "{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -581,6 +585,14 @@ Global
{A073C0EE-8732-42F9-A22E-D47034E25076}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A073C0EE-8732-42F9-A22E-D47034E25076}.Release|Any CPU.Build.0 = Release|Any CPU
{2B994F33-16CF-4679-936A-5AEABC529D2C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2B994F33-16CF-4679-936A-5AEABC529D2C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2B994F33-16CF-4679-936A-5AEABC529D2C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2B994F33-16CF-4679-936A-5AEABC529D2C}.Release|Any CPU.Build.0 = Release|Any CPU
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -688,6 +700,8 @@ Global
{8FC6457C-6273-4338-AD2A-ECAA8FE2C5D7} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
{F13247A0-70C9-4200-9CB1-2002CB8105E0} = {082D25DB-70CA-48F4-93E0-EC3455F494B8}
{A073C0EE-8732-42F9-A22E-D47034E25076} = {4CD3AA9E-D937-48CA-BB6C-158E12257D23}
{2B994F33-16CF-4679-936A-5AEABC529D2C} = {EB2EDE59-5021-42EE-A97A-D59939B39C66}
{B8F43537-2D2E-42A0-BE67-5E07E4313AEA} = {2CAB7894-777C-42B1-8B1E-322868CE92C7}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7BFB3429-B5BB-4DB1-95B4-67D77A864952}
Expand Down
105 changes: 105 additions & 0 deletions src/AdoNet/Orleans.Streaming.AdoNet/AdoNetBatchContainer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
namespace Orleans.Streaming.AdoNet;

/// <summary>
/// The <see cref="IBatchContainer"/> implementation for the ADONET provider.
/// </summary>
/// <remarks>
/// 1. This class only supports binary serialization as performance and data size is the priority for database storage.
/// 2. Though the <see cref="SequenceToken"/> is supported here, it is not yet used, as the ADO.NET provider is not rewindable.
/// </remarks>
[GenerateSerializer]
[Alias("Orleans.Streaming.AdoNet.AdoNetBatchContainer")]
internal class AdoNetBatchContainer : IBatchContainer
{
public AdoNetBatchContainer(StreamId streamId, List<object> events, Dictionary<string, object> requestContext)
{
ArgumentNullException.ThrowIfNull(events);

StreamId = streamId;
Events = events;
RequestContext = requestContext;
}

#region Serialized State

[Id(0)]
public StreamId StreamId { get; }

[Id(1)]
public List<object> Events { get; }

[Id(2)]
public Dictionary<string, object> RequestContext { get; }

[Id(3)]
public EventSequenceTokenV2 SequenceToken { get; internal set; } = null!;

/// <summary>
/// Holds the receipt for message confirmation.
/// </summary>
[Id(4)]
public int Dequeued { get; internal set; }

#endregion Serialized State

#region Interface

StreamSequenceToken IBatchContainer.SequenceToken => SequenceToken;

public IEnumerable<Tuple<T, StreamSequenceToken>> GetEvents<T>()
{
return SequenceToken is null
? throw new InvalidOperationException($"Cannot get events from a half-baked {nameof(AdoNetBatchContainer)}")
: Events
.OfType<T>()
.Select((e, i) => Tuple.Create<T, StreamSequenceToken>(e, SequenceToken.CreateSequenceTokenForEvent(i)));
}

public bool ImportRequestContext()
{
if (RequestContext is not null)
{
RequestContextExtensions.Import(RequestContext);
return true;
}

return false;
}

#endregion Interface

#region Conversion

/// <summary>
/// Creates a new <see cref="AdoNetBatchContainer"/> from the specified <see cref="AdoNetStreamMessage"/>.
/// </summary>
public static AdoNetBatchContainer FromMessage(Serializer<AdoNetBatchContainer> serializer, AdoNetStreamMessage message)
{
ArgumentNullException.ThrowIfNull(serializer);
ArgumentNullException.ThrowIfNull(message);

var container = serializer.Deserialize(message.Payload);
container.SequenceToken = new(message.MessageId);
container.Dequeued = message.Dequeued;

return container;
}

/// <summary>
/// Converts the specified <see cref="AdoNetBatchContainer"/> to a message payload.
/// </summary>
public static byte[] ToMessagePayload(Serializer<AdoNetBatchContainer> serializer, StreamId streamId, List<object> events, Dictionary<string, object> requestContext)
{
ArgumentNullException.ThrowIfNull(serializer);
ArgumentNullException.ThrowIfNull(events);

var container = new AdoNetBatchContainer(streamId, events, requestContext);
var payload = serializer.SerializeToArray(container);

return payload;
}

#endregion Conversion

public override string ToString() => $"[{nameof(AdoNetBatchContainer)}:Stream={StreamId},#Items={Events.Count}]";
}
71 changes: 71 additions & 0 deletions src/AdoNet/Orleans.Streaming.AdoNet/AdoNetQueueAdapter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
namespace Orleans.Streaming.AdoNet;

/// <summary>
/// Stream queue storage adapter for ADO.NET providers.
/// </summary>
internal partial class AdoNetQueueAdapter(string name, AdoNetStreamOptions streamOptions, ClusterOptions clusterOptions, SimpleQueueCacheOptions cacheOptions, AdoNetStreamQueueMapper mapper, RelationalOrleansQueries queries, Serializer<AdoNetBatchContainer> serializer, ILogger<AdoNetQueueAdapter> logger, IServiceProvider serviceProvider) : IQueueAdapter
{
private readonly ILogger<AdoNetQueueAdapter> _logger = logger;

/// <summary>
/// Maps to the ProviderId in the database.
/// </summary>
public string Name { get; } = name;

/// <summary>
/// The ADO.NET provider is not yet rewindable.
/// </summary>
public bool IsRewindable => false;

/// <summary>
/// The ADO.NET provider works both ways.
/// </summary>
public StreamProviderDirection Direction => StreamProviderDirection.ReadWrite;

public IQueueAdapterReceiver CreateReceiver(QueueId queueId)
{
// map the queue id
var adoNetQueueId = mapper.GetAdoNetQueueId(queueId);

// create the receiver
return ReceiverFactory(serviceProvider, [Name, adoNetQueueId, streamOptions, clusterOptions, cacheOptions, queries]);
}

public async Task QueueMessageBatchAsync<T>(StreamId streamId, IEnumerable<T> events, StreamSequenceToken token, Dictionary<string, object> requestContext)
{
// the ADO.NET provider is not rewindable so we do not support user supplied tokens
if (token is not null)
{
throw new ArgumentException($"{nameof(AdoNetQueueAdapter)} does not support a user supplied {nameof(StreamSequenceToken)}.");
}

// map the Orleans stream id to the corresponding queue id
var queueId = mapper.GetAdoNetQueueId(streamId);

// create the payload from the events
var payload = AdoNetBatchContainer.ToMessagePayload(serializer, streamId, events.Cast<object>().ToList(), requestContext);

// we can enqueue the message now
try
{
await queries.QueueStreamMessageAsync(clusterOptions.ServiceId, Name, queueId, payload, streamOptions.ExpiryTimeout.TotalSecondsCeiling());
}
catch (Exception ex)
{
LogFailedToQueueStreamMessage(ex, clusterOptions.ServiceId, Name, queueId);
throw;
}
}

/// <summary>
/// The receiver factory.
/// </summary>
private static readonly ObjectFactory<AdoNetQueueAdapterReceiver> ReceiverFactory = ActivatorUtilities.CreateFactory<AdoNetQueueAdapterReceiver>([typeof(string), typeof(string), typeof(AdoNetStreamOptions), typeof(ClusterOptions), typeof(SimpleQueueCacheOptions), typeof(RelationalOrleansQueries)]);

#region Logging

[LoggerMessage(1, LogLevel.Error, "Failed to queue stream message with ({ServiceId}, {ProviderId}, {QueueId})")]
private partial void LogFailedToQueueStreamMessage(Exception ex, string serviceId, string providerId, string queueId);

#endregion Logging
}
121 changes: 121 additions & 0 deletions src/AdoNet/Orleans.Streaming.AdoNet/AdoNetQueueAdapterFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using System.Threading;
using Microsoft.Extensions.Hosting;

namespace Orleans.Streaming.AdoNet;

internal class AdoNetQueueAdapterFactory : IQueueAdapterFactory
{
public AdoNetQueueAdapterFactory(string name, AdoNetStreamOptions streamOptions, ClusterOptions clusterOptions, SimpleQueueCacheOptions cacheOptions, HashRingStreamQueueMapperOptions hashOptions, ILoggerFactory loggerFactory, IHostApplicationLifetime lifetime, IServiceProvider serviceProvider)
{
_name = name;
_streamOptions = streamOptions;
_clusterOptions = clusterOptions;
_cacheOptions = cacheOptions;
_lifetime = lifetime;
_serviceProvider = serviceProvider;

_streamQueueMapper = new HashRingBasedStreamQueueMapper(hashOptions, name);
_cache = new SimpleQueueAdapterCache(cacheOptions, name, loggerFactory);
_adoNetQueueMapper = new AdoNetStreamQueueMapper(_streamQueueMapper);
}

private readonly string _name;
private readonly AdoNetStreamOptions _streamOptions;
private readonly ClusterOptions _clusterOptions;
private readonly SimpleQueueCacheOptions _cacheOptions;
private readonly IHostApplicationLifetime _lifetime;
private readonly IServiceProvider _serviceProvider;

private readonly HashRingBasedStreamQueueMapper _streamQueueMapper;
private readonly SimpleQueueAdapterCache _cache;
private readonly AdoNetStreamQueueMapper _adoNetQueueMapper;

private RelationalOrleansQueries _queries;

/// <summary>
/// Unfortunate implementation detail to account for lack of async lifetime.
/// Ideally this concern will be moved upstream so this won't be needed.
/// </summary>
private readonly SemaphoreSlim _semaphore = new(1);

/// <summary>
/// Ensures queries are loaded only once while allowing for recovery if the load fails.
/// </summary>
private ValueTask<RelationalOrleansQueries> GetQueriesAsync()
{
// attempt fast path
return _queries is not null ? new(_queries) : new(CoreAsync());

// slow path
async Task<RelationalOrleansQueries> CoreAsync()
{
await _semaphore.WaitAsync(_streamOptions.InitializationTimeout, _lifetime.ApplicationStopping);
try
{
// attempt fast path again
if (_queries is not null)
{
return _queries;
}

// slow path - the member variable will only be set if the call succeeds
return _queries = await RelationalOrleansQueries
.CreateInstance(_streamOptions.Invariant, _streamOptions.ConnectionString)
.WaitAsync(_streamOptions.InitializationTimeout);
}
finally
{
_semaphore.Release();
}
}
}

public async Task<IQueueAdapter> CreateAdapter()
{
var queries = await GetQueriesAsync();

return AdapterFactory(_serviceProvider, [_name, _streamOptions, _clusterOptions, _cacheOptions, _adoNetQueueMapper, queries]);
}

public async Task<IStreamFailureHandler> GetDeliveryFailureHandler(QueueId queueId)
{
var queries = await GetQueriesAsync();

return HandlerFactory(_serviceProvider, [false, _streamOptions, _clusterOptions, _adoNetQueueMapper, queries]);
}

public IQueueAdapterCache GetQueueAdapterCache() => _cache;

public IStreamQueueMapper GetStreamQueueMapper() => _streamQueueMapper;

/// <summary>
/// Used by the silo and client configurators as an entry point to set up a stream.
/// </summary>
public static IQueueAdapterFactory Create(IServiceProvider serviceProvider, string name)
{
ArgumentNullException.ThrowIfNull(serviceProvider);
ArgumentNullException.ThrowIfNull(name);

var streamOptions = serviceProvider.GetOptionsByName<AdoNetStreamOptions>(name);
var clusterOptions = serviceProvider.GetProviderClusterOptions(name).Value;
var cacheOptions = serviceProvider.GetOptionsByName<SimpleQueueCacheOptions>(name);
var hashOptions = serviceProvider.GetOptionsByName<HashRingStreamQueueMapperOptions>(name);

return QueueAdapterFactoryFactory(serviceProvider, [name, streamOptions, clusterOptions, cacheOptions, hashOptions]);
}

/// <summary>
/// Factory of <see cref="AdoNetQueueAdapterFactory"/> instances.
/// </summary>
private static readonly ObjectFactory<AdoNetQueueAdapterFactory> QueueAdapterFactoryFactory = ActivatorUtilities.CreateFactory<AdoNetQueueAdapterFactory>([typeof(string), typeof(AdoNetStreamOptions), typeof(ClusterOptions), typeof(SimpleQueueCacheOptions), typeof(HashRingStreamQueueMapperOptions)]);

/// <summary>
/// Factory of <see cref="AdoNetQueueAdapter"/> instances.
/// </summary>
private static readonly ObjectFactory<AdoNetQueueAdapter> AdapterFactory = ActivatorUtilities.CreateFactory<AdoNetQueueAdapter>([typeof(string), typeof(AdoNetStreamOptions), typeof(ClusterOptions), typeof(SimpleQueueCacheOptions), typeof(AdoNetStreamQueueMapper), typeof(RelationalOrleansQueries)]);

/// <summary>
/// Factory of <see cref="AdoNetStreamFailureHandler"/> instances.
/// </summary>
private static readonly ObjectFactory<AdoNetStreamFailureHandler> HandlerFactory = ActivatorUtilities.CreateFactory<AdoNetStreamFailureHandler>([typeof(bool), typeof(AdoNetStreamOptions), typeof(ClusterOptions), typeof(AdoNetStreamQueueMapper), typeof(RelationalOrleansQueries)]);
}
Loading
Loading