/
IDataTransferObject.cs
257 lines (229 loc) · 12.1 KB
/
IDataTransferObject.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
using System.Diagnostics;
using System.IO.Pipelines;
namespace DotNext.IO;
using Buffers;
/// <summary>
/// Represents structured data unit that can be transferred over wire.
/// </summary>
/// <remarks>
/// Typically, this interface is used for variable-length data units while
/// <see cref="Buffers.Binary.IBinaryFormattable{TSelf}"/> can be used for simple fixed-length structures.
/// </remarks>
/// <seealso cref="IAsyncBinaryReader"/>
/// <seealso cref="IAsyncBinaryWriter"/>
public interface IDataTransferObject
{
private const int DefaultBufferSize = 256;
/// <summary>
/// Gets empty data transfer object.
/// </summary>
public static IDataTransferObject Empty => EmptyDataTransferObject.Instance;
/// <summary>
/// Represents DTO transformation.
/// </summary>
/// <typeparam name="TResult">The result type.</typeparam>
public interface ITransformation<TResult>
{
/// <summary>
/// Parses DTO content asynchronously.
/// </summary>
/// <param name="reader">The reader of DTO content.</param>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <typeparam name="TReader">The type of binary reader.</typeparam>
/// <returns>The converted DTO content.</returns>
ValueTask<TResult> TransformAsync<TReader>(TReader reader, CancellationToken token)
where TReader : notnull, IAsyncBinaryReader;
}
/// <summary>
/// Indicates that the content of this object can be copied to the output stream or pipe multiple times.
/// </summary>
bool IsReusable { get; }
/// <summary>
/// Gets length of the object payload, in bytes.
/// </summary>
/// <remarks>
/// If value is <see langword="null"/> then length of the payload cannot be determined.
/// </remarks>
long? Length { get; }
/// <summary>
/// Transforms this object to serialized form.
/// </summary>
/// <param name="writer">The binary writer.</param>
/// <param name="token">The toke that can be used to cancel the operation.</param>
/// <typeparam name="TWriter">The type of writer.</typeparam>
/// <returns>The task representing state of asynchronous execution.</returns>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
ValueTask WriteToAsync<TWriter>(TWriter writer, CancellationToken token)
where TWriter : notnull, IAsyncBinaryWriter;
private static void ResetStream(Stream stream, bool resetStream)
{
if (resetStream && stream.CanSeek)
stream.Seek(0L, SeekOrigin.Begin);
}
/// <summary>
/// Decodes the stream.
/// </summary>
/// <param name="input">The stream to decode.</param>
/// <param name="transformation">The decoder.</param>
/// <param name="resetStream"><see langword="true"/> to reset stream position after decoding.</param>
/// <param name="buffer">The temporary buffer.</param>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <typeparam name="TResult">The type of result.</typeparam>
/// <typeparam name="TTransformation">The type of parser.</typeparam>
/// <returns>The decoded stream.</returns>
/// <exception cref="ArgumentException"><paramref name="buffer"/> is empty.</exception>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
protected static async ValueTask<TResult> TransformAsync<TResult, TTransformation>(Stream input, TTransformation transformation, bool resetStream, Memory<byte> buffer, CancellationToken token)
where TTransformation : notnull, ITransformation<TResult>
{
if (buffer.IsEmpty)
throw new ArgumentException(ExceptionMessages.BufferTooSmall, nameof(buffer));
try
{
return await transformation.TransformAsync(new AsyncStreamBinaryAccessor(input, buffer), token).ConfigureAwait(false);
}
finally
{
ResetStream(input, resetStream);
}
}
/// <summary>
/// Decodes the stream.
/// </summary>
/// <param name="input">The stream to decode.</param>
/// <param name="transformation">The decoder.</param>
/// <param name="resetStream"><see langword="true"/> to reset stream position after decoding.</param>
/// <param name="allocator">The allocator of temporary buffer.</param>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <typeparam name="TResult">The type of result.</typeparam>
/// <typeparam name="TTransformation">The type of parser.</typeparam>
/// <returns>The decoded stream.</returns>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
protected static async ValueTask<TResult> TransformAsync<TResult, TTransformation>(Stream input, TTransformation transformation, bool resetStream, MemoryAllocator<byte>? allocator, CancellationToken token)
where TTransformation : notnull, ITransformation<TResult>
{
var buffer = allocator.AllocateAtLeast(DefaultBufferSize);
try
{
return await transformation.TransformAsync(new AsyncStreamBinaryAccessor(input, buffer.Memory), token).ConfigureAwait(false);
}
finally
{
buffer.Dispose();
ResetStream(input, resetStream);
}
}
/// <summary>
/// Decodes the stream.
/// </summary>
/// <param name="input">The stream to decode.</param>
/// <param name="transformation">The decoder.</param>
/// <param name="resetStream"><see langword="true"/> to reset stream position after decoding.</param>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <typeparam name="TResult">The type of result.</typeparam>
/// <typeparam name="TTransformation">The type of parser.</typeparam>
/// <returns>The decoded stream.</returns>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
protected static ValueTask<TResult> TransformAsync<TResult, TTransformation>(Stream input, TTransformation transformation, bool resetStream, CancellationToken token)
where TTransformation : notnull, ITransformation<TResult>
=> TransformAsync<TResult, TTransformation>(input, transformation, resetStream, default(MemoryAllocator<byte>), token);
/// <summary>
/// Decodes the data using pipe reader.
/// </summary>
/// <param name="input">The pipe reader used for decoding.</param>
/// <param name="transformation">The decoder.</param>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <typeparam name="TResult">The type of result.</typeparam>
/// <typeparam name="TTransformation">The type of parser.</typeparam>
/// <returns>The decoded stream.</returns>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
protected static ValueTask<TResult> TransformAsync<TResult, TTransformation>(PipeReader input, TTransformation transformation, CancellationToken token)
where TTransformation : notnull, ITransformation<TResult>
=> transformation.TransformAsync(new Pipelines.PipeBinaryReader(input), token);
// use rented buffer of the small size
private async ValueTask<TResult> GetSmallObjectDataAsync<TResult, TTransformation>(TTransformation parser, long length, CancellationToken token)
where TTransformation : notnull, ITransformation<TResult>
{
Debug.Assert(length <= Array.MaxLength);
using var writer = new PoolingArrayBufferWriter<byte> { Capacity = (int)length };
await WriteToAsync(new AsyncBufferWriter(writer), token).ConfigureAwait(false);
return await parser.TransformAsync(new SequenceReader(writer.WrittenMemory), token).ConfigureAwait(false);
}
// use FileBufferingWriter to keep the balance between I/O performance and memory consumption
// when size is unknown
private async ValueTask<TResult> GetUnknownObjectDataAsync<TResult, TTransformation>(TTransformation parser, CancellationToken token)
where TTransformation : notnull, ITransformation<TResult>
{
var output = new FileBufferingWriter(asyncIO: true);
await using (output.ConfigureAwait(false))
{
using var buffer = Memory.AllocateAtLeast<byte>(DefaultBufferSize);
// serialize
await WriteToAsync(new AsyncStreamBinaryAccessor(output, buffer.Memory), token).ConfigureAwait(false);
// deserialize
if (output.TryGetWrittenContent(out var memory))
return await parser.TransformAsync(new SequenceReader(memory), token).ConfigureAwait(false);
var input = await output.GetWrittenContentAsStreamAsync(token).ConfigureAwait(false);
await using (input.ConfigureAwait(false))
return await parser.TransformAsync(new AsyncStreamBinaryAccessor(input, buffer.Memory), token).ConfigureAwait(false);
}
}
// use disk I/O for large-size object
private async ValueTask<TResult> GetLargeObjectDataAsync<TResult, TTransformation>(TTransformation parser, long length, CancellationToken token)
where TTransformation : notnull, ITransformation<TResult>
{
var tempFileName = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
const FileOptions tempFileOptions = FileOptions.Asynchronous | FileOptions.DeleteOnClose | FileOptions.SequentialScan;
var fs = new FileStream(tempFileName, new FileStreamOptions
{
Mode = FileMode.CreateNew,
Access = FileAccess.ReadWrite,
Share = FileShare.None,
BufferSize = DefaultBufferSize,
Options = tempFileOptions,
PreallocationSize = length,
});
await using (fs.ConfigureAwait(false))
{
using var buffer = Memory.AllocateAtLeast<byte>(DefaultBufferSize);
// serialize
await WriteToAsync(new AsyncStreamBinaryAccessor(fs, buffer.Memory), token).ConfigureAwait(false);
await fs.FlushAsync(token).ConfigureAwait(false);
// deserialize
fs.Position = 0L;
return await parser.TransformAsync(new AsyncStreamBinaryAccessor(fs, buffer.Memory), token).ConfigureAwait(false);
}
}
/// <summary>
/// Converts data transfer object to another type.
/// </summary>
/// <remarks>
/// The default implementation copies the content into memory
/// before parsing.
/// </remarks>
/// <param name="transformation">The parser instance.</param>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <typeparam name="TResult">The type of result.</typeparam>
/// <typeparam name="TTransformation">The type of parser.</typeparam>
/// <returns>The converted DTO content.</returns>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
ValueTask<TResult> TransformAsync<TResult, TTransformation>(TTransformation transformation, CancellationToken token = default)
where TTransformation : notnull, ITransformation<TResult>
{
if (TryGetMemory(out var memory))
return transformation.TransformAsync(IAsyncBinaryReader.Create(memory), token);
if (Length.TryGetValue(out var length))
return length < FileBufferingWriter.Options.DefaultMemoryThreshold ? GetSmallObjectDataAsync<TResult, TTransformation>(transformation, length, token) : GetLargeObjectDataAsync<TResult, TTransformation>(transformation, length, token);
return GetUnknownObjectDataAsync<TResult, TTransformation>(transformation, token);
}
/// <summary>
/// Attempts to retrieve contents of this object as a memory block synchronously.
/// </summary>
/// <param name="memory">The memory block containing contents of this object.</param>
/// <returns><see langword="true"/> if this object is representable as a memory block; otherwise, <see langword="false"/>.</returns>
bool TryGetMemory(out ReadOnlyMemory<byte> memory)
{
memory = default;
return false;
}
}