-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
TabularWriter`1.cs
155 lines (129 loc) · 6.01 KB
/
TabularWriter`1.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// (c) Oleksandr Kozlenko. Licensed under the MIT license.
using System.Runtime.CompilerServices;
namespace Addax.Formats.Tabular;
/// <summary>Provides forward-only, write-only access to tabular data records. This class cannot be inherited.</summary>
/// <typeparam name="T">The type of an object that represents a record.</typeparam>
public sealed class TabularWriter<T> : IDisposable, IAsyncDisposable
where T : notnull
{
private readonly TabularWriter _fieldWriter;
private readonly TabularHandler<T> _recordHandler;
/// <summary>Initializes a new instance of the <see cref="TabularWriter{T}" /> class for the specified stream using the provided dialect, options, and record handler.</summary>
/// <param name="stream">The stream to write to.</param>
/// <param name="dialect">The dialect to use for writing.</param>
/// <param name="options">The options to control the behavior during writing.</param>
/// <param name="handler">The handler to write a <typeparamref name="T" /> instance to a record.</param>
/// <exception cref="ArgumentException"><paramref name="stream" /> does not support writing.</exception>
/// <exception cref="ArgumentNullException"><paramref name="stream" /> or <paramref name="dialect" /> is <see langword="null" />.</exception>
/// <exception cref="InvalidOperationException">The record handler is not specified and cannot be found in the registry.</exception>
public TabularWriter(Stream stream, TabularDialect dialect, TabularOptions? options = null, TabularHandler<T>? handler = null)
{
ArgumentNullException.ThrowIfNull(stream);
ArgumentNullException.ThrowIfNull(dialect);
_fieldWriter = new(stream, dialect, options);
_recordHandler = handler ?? TabularRegistry.SelectHandler<T>();
}
/// <summary>Releases the resources used by the current instance of the <see cref="TabularWriter{T}" /> class.</summary>
/// <remarks>If the record handler defines a header, it will be automatically written.</remarks>
public void Dispose()
{
if (_fieldWriter.CurrentPositionType == TabularPositionType.StartOfStream)
{
WriteHeader();
}
_fieldWriter.Dispose();
}
/// <summary>Asynchronously releases the resources used by the current instance of the <see cref="TabularWriter{T}" /> class.</summary>
/// <returns>A task object.</returns>
/// <remarks>If the record handler defines a header, it will be automatically written.</remarks>
public async ValueTask DisposeAsync()
{
if (_fieldWriter.CurrentPositionType == TabularPositionType.StartOfStream)
{
await WriteHeaderAsync(default).ConfigureAwait(false);
}
await _fieldWriter.DisposeAsync().ConfigureAwait(false);
}
/// <summary>Writes the next record represented as <typeparamref name="T" />.</summary>
/// <param name="record">The record to write.</param>
/// <exception cref="ArgumentNullException"><paramref name="record" /> is <see langword="null" />.</exception>
/// <remarks>If the record handler defines a header, it will be automatically written.</remarks>
public void WriteRecord(in T record)
{
ArgumentNullException.ThrowIfNull(record);
if (_fieldWriter.CurrentPositionType == TabularPositionType.StartOfStream)
{
WriteHeader();
}
_recordHandler.Write(_fieldWriter, record);
_fieldWriter.FinishRecord();
}
/// <summary>Asynchronously writes the next record represented as <typeparamref name="T" />.</summary>
/// <param name="record">The record to write.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task object.</returns>
/// <exception cref="ArgumentNullException"><paramref name="record" /> is <see langword="null" />.</exception>
/// <exception cref="OperationCanceledException">The cancellation token was canceled. This exception is stored into the returned task.</exception>
/// <remarks>If the record handler defines a header, it will be automatically written.</remarks>
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
public async ValueTask WriteRecordAsync(T record, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(record);
if (_fieldWriter.CurrentPositionType == TabularPositionType.StartOfStream)
{
await WriteHeaderAsync(cancellationToken).ConfigureAwait(false);
}
await _recordHandler.WriteAsync(_fieldWriter, record, cancellationToken).ConfigureAwait(false);
_fieldWriter.FinishRecord();
}
/// <inheritdoc cref="TabularWriter.Flush()" />
public void Flush()
{
_fieldWriter.Flush();
}
/// <inheritdoc cref="TabularWriter.FlushAsync(CancellationToken)" />
public ValueTask FlushAsync(CancellationToken cancellationToken = default)
{
return _fieldWriter.FlushAsync(cancellationToken);
}
private void WriteHeader()
{
var header = _recordHandler.Header;
if (header is not null)
{
foreach (var name in header)
{
_fieldWriter.WriteString(name);
}
_fieldWriter.FinishRecord();
}
}
private async ValueTask WriteHeaderAsync(CancellationToken cancellationToken)
{
var header = _recordHandler.Header;
if (header is not null)
{
foreach (var name in header)
{
await _fieldWriter.WriteStringAsync(name, cancellationToken).ConfigureAwait(false);
}
_fieldWriter.FinishRecord();
}
}
/// <inheritdoc cref="TabularWriter.BytesCommitted" />
public long BytesCommitted
{
get
{
return _fieldWriter.BytesCommitted;
}
}
/// <inheritdoc cref="TabularWriter.RecordsWritten" />
public long RecordsWritten
{
get
{
return _fieldWriter.RecordsWritten;
}
}
}