/
DataMessageObserver.cs
66 lines (55 loc) · 1.54 KB
/
DataMessageObserver.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using HotChocolate.Transport.Sockets.Client.Helpers;
namespace HotChocolate.Transport.Sockets.Client.Protocols;
internal sealed class DataMessageObserver : IObserver<IOperationMessage>, IDisposable
{
private readonly SemaphoreSlim _semaphore = new(0);
private readonly ConcurrentQueue<IDataMessage> _messages = new();
private readonly string _id;
private Exception? _error;
private bool _disposed;
public DataMessageObserver(string id)
{
_id = id;
}
public async ValueTask<IDataMessage?> TryReadNextAsync(CancellationToken ct)
{
if (_disposed)
{
throw new ObjectDisposedException($"{nameof(DataMessageObserver)} is disposed.");
}
await _semaphore.WaitAsync(ct);
if (_error is not null)
{
throw _error;
}
_messages.TryDequeue(out var message);
return message;
}
public void OnNext(IOperationMessage value)
{
if (value is IDataMessage message && message.Id.EqualsOrdinal(_id))
{
_messages.Enqueue(message);
_semaphore.Release();
}
}
public void OnError(Exception error)
{
_error = error;
_semaphore.Release();
}
public void OnCompleted()
=> _semaphore.Release();
public void Dispose()
{
if (!_disposed)
{
_semaphore.Dispose();
_disposed = true;
}
}
}