-
-
Notifications
You must be signed in to change notification settings - Fork 722
/
SocketOperation.cs
132 lines (118 loc) · 3.78 KB
/
SocketOperation.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using StrawberryShake.Transport.WebSockets.Messages;
namespace StrawberryShake.Transport.WebSockets;
/// <summary>
/// Represents a operation on a socket
/// </summary>
public sealed class SocketOperation : ISocketOperation
{
private readonly ISession _manager;
private readonly Channel<OperationMessage> _channel;
private bool _disposed;
/// <summary>
/// The id of the operation
/// </summary>
public string Id { get; }
/// <summary>
/// Initializes a new <see cref="SocketOperation"/>
/// </summary>
/// <param name="manager">
/// The socket operation manager that this operation manages
/// </param>
public SocketOperation(ISession manager)
: this(manager, Guid.NewGuid().ToString())
{
}
/// <summary>
/// Initializes a new <see cref="SocketOperation"/>
/// </summary>
/// <param name="manager">
/// The socket operation manager that this operation manages
/// </param>
/// <param name="id">
/// The id of this operation
/// </param>
public SocketOperation(
ISession manager,
string id)
{
_manager = manager ?? throw new ArgumentNullException(nameof(manager));
Id = id ?? throw new ArgumentNullException(nameof(id));
_channel = Channel.CreateUnbounded<OperationMessage>();
}
/// <inheritdoc />
public IAsyncEnumerable<OperationMessage> ReadAsync()
=> new MessageStream(this, _channel);
/// <inheritdoc />
public async ValueTask CompleteAsync(CancellationToken cancellationToken)
{
if (!_disposed)
{
try
{
await _channel.Writer.WriteAsync(CompleteOperationMessage.Default, cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
// if the channel is closed we will move on.
}
}
}
private sealed class MessageStream : IAsyncEnumerable<OperationMessage>
{
private readonly SocketOperation _operation;
private readonly Channel<OperationMessage> _channel;
public MessageStream(SocketOperation operation, Channel<OperationMessage> channel)
{
_operation = operation;
_channel = channel;
}
public async IAsyncEnumerator<OperationMessage> GetAsyncEnumerator(
CancellationToken cancellationToken = default)
{
if (_operation._disposed)
{
yield break;
}
ChannelReader<OperationMessage> reader = _channel.Reader;
while (!_operation._disposed && !reader.Completion.IsCompleted)
{
if (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false) &&
reader.TryRead(out OperationMessage? message))
{
yield return message;
}
}
}
}
internal async ValueTask ReceiveMessageAsync(
OperationMessage message,
CancellationToken cancellationToken)
{
if (!_disposed)
{
try
{
await _channel.Writer.WriteAsync(message, cancellationToken).ConfigureAwait(false);
}
catch (ChannelClosedException)
{
// if the channel is closed we will move on.
}
}
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
if (!_disposed)
{
_channel.Writer.TryComplete();
await _manager.StopOperationAsync(Id, CancellationToken.None).ConfigureAwait(false);
_disposed = true;
}
}
}