Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
1f41f1b
Subscription<T>
nvborisenko Apr 15, 2026
bdbc7e2
Routing in Broker
nvborisenko Apr 15, 2026
2ffe2f5
Overloads
nvborisenko Apr 15, 2026
a74af07
Unified Subscribe for modules
nvborisenko Apr 15, 2026
40aba90
Test to consume async enumerable
nvborisenko Apr 15, 2026
9f83b42
Merge remote-tracking branch 'upstream/trunk' into bidi-event-stream
nvborisenko Apr 16, 2026
4590311
Before refactoring
nvborisenko Apr 16, 2026
6985b80
Unsubscribe and await consumers
nvborisenko Apr 16, 2026
58ea004
Graceful awaiting
nvborisenko Apr 16, 2026
e32b134
*Event for subscribing
nvborisenko Apr 16, 2026
e344fe2
Linq test
nvborisenko Apr 16, 2026
316cb9e
Last chance to get in-flight
nvborisenko Apr 16, 2026
dcb056d
ReadAll
nvborisenko Apr 17, 2026
3418c3b
OnEvent
nvborisenko Apr 17, 2026
47f26ba
Awaiting active handlers
nvborisenko Apr 17, 2026
5ebd141
Ignore
nvborisenko Apr 17, 2026
7918c98
EventDescriptor
nvborisenko Apr 18, 2026
e1eb5bc
Batched events
nvborisenko Apr 18, 2026
c59113e
Thread dispose
nvborisenko Apr 18, 2026
ef673d3
Don't track if handler is completed
nvborisenko Apr 18, 2026
5211bdd
Unsubscribe and Dispose
nvborisenko Apr 18, 2026
a7607e4
Scoped reader
nvborisenko Apr 18, 2026
2f7a127
Batched scoped action
nvborisenko Apr 18, 2026
ff0585f
ContextEventSource
nvborisenko Apr 18, 2026
7b20c79
Resolve ibidi dependency
nvborisenko Apr 18, 2026
767db37
Format
nvborisenko Apr 18, 2026
fd44aac
Static EventDescriptor self contained
nvborisenko Apr 18, 2026
16bf553
Simplify event descriptor
nvborisenko Apr 18, 2026
d41b92d
Test
nvborisenko Apr 18, 2026
dc4c4cb
Custom event
nvborisenko Apr 18, 2026
d5c8b60
CanConsumeScopedAsyncEventStream test
nvborisenko Apr 18, 2026
17158dd
Remove public descriptor property
nvborisenko Apr 20, 2026
2a50747
EventReader and EventSubscription parity
nvborisenko Apr 21, 2026
0b86f43
Sequential handlers
nvborisenko Apr 21, 2026
aed066c
Throw on bad handler
nvborisenko Apr 21, 2026
d1daaf8
Don't wrap
nvborisenko Apr 21, 2026
21a8d52
Force our handlers to be safe
nvborisenko Apr 21, 2026
99fbf56
Merge remote-tracking branch 'upstream/trunk' into bidi-event-stream
nvborisenko Apr 25, 2026
bf67c94
Merge remote-tracking branch 'upstream/trunk' into bidi-event-stream
nvborisenko Apr 27, 2026
010bd2f
Merge remote-tracking branch 'upstream/trunk' into bidi-event-stream
nvborisenko Apr 29, 2026
def2e48
Merge remote-tracking branch 'upstream/trunk' into bidi-event-stream
nvborisenko May 1, 2026
80ec28d
Rename to EventListener
nvborisenko May 1, 2026
761c8c9
Stabilize tests
nvborisenko May 1, 2026
63eedb0
Split subscription options
nvborisenko May 1, 2026
e4991cd
EventArgs
nvborisenko May 1, 2026
e940f5f
Extract IEventSource
nvborisenko May 1, 2026
f8ed9d9
Remove scoped reader
nvborisenko May 1, 2026
a339b64
And test
nvborisenko May 1, 2026
06cd867
Simplify words
nvborisenko May 1, 2026
2efd7a5
Throw bad handlers
nvborisenko May 1, 2026
069710d
Try _unsubscribe
nvborisenko May 1, 2026
1ec223e
_handlerError throw priority
nvborisenko May 1, 2026
0ea4bef
Hard cast guard
nvborisenko May 1, 2026
5aede1c
Logging errors
nvborisenko May 1, 2026
42e8ef7
Address feedback
nvborisenko May 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 46 additions & 4 deletions dotnet/src/webdriver/BiDi/BiDi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace OpenQA.Selenium.BiDi;
public sealed class BiDi : IBiDi
{
private readonly ConcurrentDictionary<Type, Module> _modules = new();
private bool _disposed;
private int _disposed;

private Broker Broker { get; set; } = null!;

Expand Down Expand Up @@ -80,15 +80,57 @@ public Task<EndResult> EndAsync(EndOptions? options = null, CancellationToken ca
return Session.EndAsync(options, cancellationToken);
}

public Task<ISubscription> SubscribeAsync<TEventArgs>(EventDescriptor<TEventArgs> descriptor, Action<TEventArgs> handler, SubscriptionOptions? options = null, CancellationToken cancellationToken = default) where TEventArgs : EventArgs
{
ArgumentNullException.ThrowIfNull(descriptor);

return SubscribeAsync([descriptor], handler, options, cancellationToken);
}

public Task<ISubscription> SubscribeAsync<TEventArgs>(EventDescriptor<TEventArgs> descriptor, Func<TEventArgs, Task> handler, SubscriptionOptions? options = null, CancellationToken cancellationToken = default) where TEventArgs : EventArgs
{
ArgumentNullException.ThrowIfNull(descriptor);

return SubscribeAsync([descriptor], handler, options, cancellationToken);
}

public async Task<ISubscription> SubscribeAsync<TEventArgs>(IEnumerable<EventDescriptor> descriptors, Action<TEventArgs> handler, SubscriptionOptions? options = null, CancellationToken cancellationToken = default) where TEventArgs : EventArgs
{
ArgumentNullException.ThrowIfNull(descriptors);
ArgumentNullException.ThrowIfNull(handler);

return await Broker.EventDispatcher.SubscribeAsync<TEventArgs>(descriptors, e => { handler(e); return default; }, options, cancellationToken).ConfigureAwait(false);
}

public async Task<ISubscription> SubscribeAsync<TEventArgs>(IEnumerable<EventDescriptor> descriptors, Func<TEventArgs, Task> handler, SubscriptionOptions? options = null, CancellationToken cancellationToken = default) where TEventArgs : EventArgs
{
ArgumentNullException.ThrowIfNull(descriptors);
ArgumentNullException.ThrowIfNull(handler);

return await Broker.EventDispatcher.SubscribeAsync<TEventArgs>(descriptors, e => new ValueTask(handler(e)), options, cancellationToken).ConfigureAwait(false);
}

public Task<IEventStream<TEventArgs>> ReadAllAsync<TEventArgs>(EventDescriptor<TEventArgs> descriptor, EventStreamOptions? options = null, CancellationToken cancellationToken = default) where TEventArgs : EventArgs
{
ArgumentNullException.ThrowIfNull(descriptor);

return ReadAllAsync<TEventArgs>([descriptor], options, cancellationToken);
}

public async Task<IEventStream<TEventArgs>> ReadAllAsync<TEventArgs>(IEnumerable<EventDescriptor> descriptors, EventStreamOptions? options = null, CancellationToken cancellationToken = default) where TEventArgs : EventArgs
{
ArgumentNullException.ThrowIfNull(descriptors);

return await Broker.EventDispatcher.SubscribeReaderAsync<TEventArgs>(descriptors, options, cancellationToken).ConfigureAwait(false);
}

public async ValueTask DisposeAsync()
{
if (_disposed)
if (Interlocked.CompareExchange(ref _disposed, 1, 0) != 0)
{
return;
}

_disposed = true;

await Broker.DisposeAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
Expand Down
82 changes: 15 additions & 67 deletions dotnet/src/webdriver/BiDi/Broker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ internal sealed class Broker : IAsyncDisposable
private readonly ILogger _logger = Internal.Logging.Log.GetLogger<Broker>();

private readonly ITransport _transport;
private readonly EventDispatcher _eventDispatcher;
private readonly BiDi _bidi;

private readonly ConcurrentDictionary<string, EventMetadata> _eventMetadata = new();
internal readonly EventDispatcher EventDispatcher;

private readonly ConcurrentDictionary<long, CommandInfo> _pendingCommands = new();

Expand All @@ -62,58 +61,17 @@ public Broker(ITransport transport, BiDi bidi)
{
_transport = transport;
_bidi = bidi;
_eventDispatcher = new EventDispatcher();

EventDispatcher = new EventDispatcher(
(events, options, ct) => bidi.Session.SubscribeAsync(events, options, ct),
(subscriptions, options, ct) => bidi.Session.UnsubscribeAsync(subscriptions, options, ct),
bidi);

_receiveMessagesCancellationTokenSource = new CancellationTokenSource();
_receivingTask = Task.Run(() => ReceiveMessagesAsync(_receiveMessagesCancellationTokenSource.Token));
_processingTask = Task.Run(ProcessMessagesAsync);
}

public async Task<Subscription> SubscribeAsync<TEventArgs, TEventParams>(Event<TEventArgs, TEventParams> descriptor, Action<TEventArgs> action, SubscriptionOptions? options, CancellationToken cancellationToken)
where TEventArgs : EventArgs
{
ValueTask InvokeAction(EventArgs args) { action((TEventArgs)args); return default; }
return await SubscribeAsync(descriptor.Name, InvokeAction, (bidi, ep) => descriptor.Factory(bidi, (TEventParams)ep), descriptor.JsonTypeInfo, options, cancellationToken).ConfigureAwait(false);
}

public async Task<Subscription> SubscribeAsync<TEventArgs, TEventParams>(Event<TEventArgs, TEventParams> descriptor, Func<TEventArgs, Task> func, SubscriptionOptions? options, CancellationToken cancellationToken)
where TEventArgs : EventArgs
{
ValueTask InvokeFunc(EventArgs args) => new(func((TEventArgs)args));
return await SubscribeAsync(descriptor.Name, InvokeFunc, (bidi, ep) => descriptor.Factory(bidi, (TEventParams)ep), descriptor.JsonTypeInfo, options, cancellationToken).ConfigureAwait(false);
}

private async Task<Subscription> SubscribeAsync(string eventName, Func<EventArgs, ValueTask> handler, Func<IBiDi, object, EventArgs> argsFactory, JsonTypeInfo jsonTypeInfo, SubscriptionOptions? options, CancellationToken cancellationToken)
{
var metadata = _eventMetadata.GetOrAdd(eventName, new EventMetadata(jsonTypeInfo, argsFactory));

if (metadata.JsonTypeInfo != jsonTypeInfo)
{
throw new ArgumentException($"Event '{eventName}' is already registered with different metadata.", nameof(eventName));
}

_eventDispatcher.AddHandler(eventName, handler);

try
{
var subscribeResult = await _bidi.Session.SubscribeAsync([eventName], new() { Contexts = options?.Contexts, UserContexts = options?.UserContexts }, cancellationToken)
.ConfigureAwait(false);

return new Subscription(subscribeResult.Subscription, this, eventName) { Handler = handler };
}
catch
{
_eventDispatcher.RemoveHandler(eventName, handler);
throw;
}
}

public async ValueTask UnsubscribeAsync(Subscription subscription, CancellationToken cancellationToken)
{
await _bidi.Session.UnsubscribeAsync([subscription.SubscriptionId], null, cancellationToken).ConfigureAwait(false);
_eventDispatcher.RemoveHandler(subscription.EventName, subscription.Handler);
}

public async Task<TResult> ExecuteAsync<TParameters, TResult>(Command<TParameters, TResult> descriptor, TParameters @params, CommandOptions? options, CancellationToken cancellationToken)
where TParameters : Parameters
where TResult : EmptyResult
Expand Down Expand Up @@ -192,10 +150,14 @@ public async Task<TResult> ExecuteAsync<TParameters, TResult>(Command<TParameter

public async ValueTask DisposeAsync()
{
_receiveMessagesCancellationTokenSource.Cancel();

try
{
// Dispose subscriptions while transport and processing loop are still active,
// allowing wire unsubscribe commands to be sent and handler drain tasks to complete.
await EventDispatcher.CompleteAllAsync(_terminalReceiveException).ConfigureAwait(false);

_receiveMessagesCancellationTokenSource.Cancel();
Comment thread
nvborisenko marked this conversation as resolved.

try
{
await _receivingTask.ConfigureAwait(false);
Expand All @@ -205,11 +167,9 @@ public async ValueTask DisposeAsync()
// Expected when cancellation is requested, ignore.
}

await _transport.DisposeAsync().ConfigureAwait(false);

await _processingTask.ConfigureAwait(false);

await _eventDispatcher.DisposeAsync().ConfigureAwait(false);
await _transport.DisposeAsync().ConfigureAwait(false);
Comment thread
nvborisenko marked this conversation as resolved.
}
finally
{
Expand Down Expand Up @@ -322,22 +282,14 @@ private void ProcessReceivedMessage(ReadOnlySpan<byte> data)
case TypeEvent:
if (method is null) throw new BiDiException($"The remote end responded with 'event' message type, but missed required 'method' property. Message content: {System.Text.Encoding.UTF8.GetString(data.ToArray())}");

if (!_eventMetadata.TryGetValue(method, out var metadata))
if (!EventDispatcher.TryDeserializeAndDispatch(method, ref paramsReader))
{
if (_logger.IsEnabled(LogEventLevel.Warn))
{
_logger.Warn($"Received BiDi event with method '{method}', but no event type mapping was found. Event will be ignored. Message content: {System.Text.Encoding.UTF8.GetString(data.ToArray())}");
}

break;
}

var eventParams = JsonSerializer.Deserialize(ref paramsReader, metadata.JsonTypeInfo)
?? throw new BiDiException("Remote end returned null event args in the 'params' property.");

var eventArgs = metadata.CreateEventArgs(_bidi, eventParams);

_eventDispatcher.EnqueueEvent(method, eventArgs);
break;

case TypeError:
Expand Down Expand Up @@ -442,6 +394,7 @@ private async Task ProcessMessagesAsync()
// Channel is fully drained. Fail any commands that didn't get a response:
// either with the transport error or cancellation for clean shutdown.
var terminalException = _terminalReceiveException;

foreach (var id in _pendingCommands.Keys)
{
if (_pendingCommands.TryRemove(id, out var pendingCommand))
Expand Down Expand Up @@ -474,11 +427,6 @@ private void ReturnBuffer(PooledBufferWriter buffer)

private readonly record struct CommandInfo(TaskCompletionSource<EmptyResult> TaskCompletionSource, JsonTypeInfo JsonResultTypeInfo);

private readonly record struct EventMetadata(JsonTypeInfo JsonTypeInfo, Func<IBiDi, object, EventArgs> ArgsFactory)
{
public EventArgs CreateEventArgs(IBiDi bidi, object eventParams) => ArgsFactory(bidi, eventParams);
}

private sealed class PooledBufferWriter : IBufferWriter<byte>, IDisposable
{
private const int DefaultBufferSize = 1024 * 8;
Expand Down
Loading
Loading