Skip to content

Commit 8aa3595

Browse files
committed
fix: performance improvements on parquet IO
1 parent 78e4c3a commit 8aa3595

6 files changed

Lines changed: 450 additions & 18 deletions

File tree

examples/advanced/KedroSpaceflights.Custom/KedroSpaceflights.Custom.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
<PackageReference Include="MathNet.Numerics" Version="5.0.0" />
2929
<PackageReference Include="NUnit" Version="4.2.2" />
3030
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
31-
<PackageReference Include="Parquet.Net" Version="5.2.0" />
3231
<PackageReference Include="Plotly.NET" Version="5.1.0" />
3332
<PackageReference Include="Plotly.NET.CSharp" Version="0.13.0" />
3433
<PackageReference Include="Plotly.NET.ImageExport" Version="6.1.0" />

src/extensions/Flowthru.Extensions.Parquet/Data/Storage/Format/ParquetFormatSerializer.cs

Lines changed: 98 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,33 @@ namespace Flowthru.Core.Data.Storage.Format;
3333
/// <item>Null Safety - Enforces non-nullable contracts during deserialization</item>
3434
/// <item>Value Type Nullability - DTOs use nullable value types to match Parquet schema conventions</item>
3535
/// <item>Enum Support - Automatically converts between Parquet's integer storage and enum types</item>
36+
/// <item>Row group streaming - Writes in bounded batches (default 1M rows/group); peak write memory
37+
/// is bounded to one row group regardless of total dataset size.</item>
3638
/// </list>
3739
/// <para>
3840
/// <strong>Current Limitations:</strong>
3941
/// </para>
4042
/// <list type="bullet">
4143
/// <item>SerializedEnum attributes are not used - enums stored/retrieved by underlying integer value</item>
44+
/// <item>Per-column encoding hints require Parquet.Net v6 (not yet on NuGet); use
45+
/// <see cref="ParquetItemOptions{TRow}.UseDictionaryEncoding"/> as a global flag in the meantime.</item>
4246
/// </list>
4347
/// </remarks>
4448
public sealed class ParquetFormatSerializer<TRow> : IFormatSerializer<TRow>
4549
where TRow : notnull, IFlatSchema, IBinarySerializable
4650
{
51+
private readonly ParquetItemOptions<TRow>? _options;
52+
4753
/// <summary>
48-
/// Initializes a new instance of the <see cref="ParquetFormatSerializer{TRow}"/> class.
54+
/// Initializes a new instance with default production-ready options.
4955
/// </summary>
5056
public ParquetFormatSerializer() { }
5157

58+
/// <summary>
59+
/// Initializes a new instance with caller-supplied tuning options.
60+
/// </summary>
61+
public ParquetFormatSerializer(ParquetItemOptions<TRow>? options) => _options = options;
62+
5263
/// <inheritdoc/>
5364
/// <remarks>
5465
/// Parquet is a columnar format that supports row group streaming for efficient
@@ -60,9 +71,13 @@ public ParquetFormatSerializer() { }
6071
/// <remarks>
6172
/// Streams rows one row group at a time. Early-exit consumers (e.g. shallow inspection)
6273
/// will break after reading fewer than all row groups, avoiding full-file materialisation.
74+
/// Any <see cref="ParquetItemOptions{TRow}"/> supplied at construction time are threaded
75+
/// into the deserialiser (date type mapping, big-decimal, encoding settings).
6376
/// </remarks>
6477
public async IAsyncEnumerable<TRow> DeserializeRows(Stream stream)
6578
{
79+
var readOptions = _options?.ToReadOptions();
80+
6681
// Read schema and row-group count from the file footer (cheap seek-based metadata read)
6782
using var reader = await ParquetReader.CreateAsync(stream, leaveStreamOpen: true);
6883
var schema = reader.Schema;
@@ -73,7 +88,7 @@ public async IAsyncEnumerable<TRow> DeserializeRows(Stream stream)
7388
for (int rgi = 0; rgi < rowGroupCount; rgi++)
7489
{
7590
stream.Position = 0;
76-
var dtos = await adapter.DeserializeRowGroup(stream, rgi);
91+
var dtos = await adapter.DeserializeRowGroup(stream, rgi, readOptions);
7792
foreach (var dto in dtos)
7893
{
7994
yield return adapter.FromDto(dto);
@@ -86,7 +101,12 @@ public async Task SerializeRows(Stream stream, IAsyncEnumerable<TRow> rows)
86101
{
87102
// For serialization, create adapter based on TRow schema (no file to read)
88103
var adapter = new ParquetAdapter<TRow>(parquetSchema: null);
89-
await adapter.SerializeToParquetAsync(stream, rows);
104+
await adapter.SerializeToParquetAsync(
105+
stream,
106+
rows,
107+
writeOptions: _options?.ToWriteOptions(),
108+
rowGroupSize: _options?.RowGroupSize ?? 1_000_000
109+
);
90110
}
91111

92112
/// <inheritdoc/>
@@ -167,23 +187,82 @@ public ParquetAdapter(ParquetSchema? parquetSchema)
167187
public TRow FromDto(object dto) => _fromDto(dto);
168188

169189
/// <summary>
170-
/// Serializes rows to Parquet format with proper type safety.
171-
/// Converts TRow instances to DTO instances and maintains type through serialization.
190+
/// Serializes rows to Parquet format, flushing one row group per <paramref name="rowGroupSize"/>
191+
/// batch. Peak write-side memory is bounded to one row group regardless of total dataset size.
172192
/// </summary>
173-
public async Task SerializeToParquetAsync(Stream stream, IAsyncEnumerable<TRow> rows)
193+
/// <remarks>
194+
/// Each flush calls <see cref="ParquetSerializer.SerializeAsync"/> with <c>Append = true</c> after
195+
/// the first batch, producing one Parquet row group per batch. For 1–10 GB datasets this avoids
196+
/// materialising the entire dataset in memory and produces multi-row-group files that enable
197+
/// predicate pushdown and read parallelism in downstream query engines.
198+
/// </remarks>
199+
public async Task SerializeToParquetAsync(
200+
Stream stream,
201+
IAsyncEnumerable<TRow> rows,
202+
ParquetSerializerOptions? writeOptions,
203+
int rowGroupSize
204+
)
174205
{
175-
// Convert to strongly-typed list using reflection to create List<TDto>
176206
var listType = typeof(List<>).MakeGenericType(_dtoType);
177-
var dtosList = (System.Collections.IList)Activator.CreateInstance(listType)!;
207+
var batch = (System.Collections.IList)Activator.CreateInstance(listType)!;
208+
bool firstBatch = true;
178209

179210
await foreach (var row in rows)
180211
{
181-
dtosList.Add(_toDto(row));
212+
batch.Add(_toDto(row));
213+
214+
if (batch.Count >= rowGroupSize)
215+
{
216+
await SerializeBatch(batch, stream, writeOptions, firstBatch);
217+
firstBatch = false;
218+
batch.Clear();
219+
}
220+
}
221+
222+
// Write the final (possibly partial) batch — handles the common single-batch case too.
223+
if (batch.Count > 0)
224+
{
225+
await SerializeBatch(batch, stream, writeOptions, firstBatch);
226+
}
227+
}
228+
229+
/// <summary>
230+
/// Writes one batch as a single Parquet row group. Stamps <c>Append = true</c> on
231+
/// subsequent batches so that each call appends a new row group rather than overwriting.
232+
/// </summary>
233+
private async Task SerializeBatch(
234+
System.Collections.IList batch,
235+
Stream stream,
236+
ParquetSerializerOptions? writeOptions,
237+
bool isFirstBatch
238+
)
239+
{
240+
ParquetSerializerOptions? opts;
241+
if (!isFirstBatch)
242+
{
243+
// Clone the caller's options (or create minimal ones) with Append = true.
244+
// ParquetSerializerOptions is a plain class with no copy constructor, so we
245+
// construct a fresh instance and copy every relevant property.
246+
opts = writeOptions != null
247+
? new ParquetSerializerOptions
248+
{
249+
Append = true,
250+
CompressionMethod = writeOptions.CompressionMethod,
251+
CompressionLevel = writeOptions.CompressionLevel,
252+
RowGroupSize = writeOptions.RowGroupSize,
253+
PropertyNameCaseInsensitive = writeOptions.PropertyNameCaseInsensitive,
254+
ParquetOptions = writeOptions.ParquetOptions,
255+
}
256+
: new ParquetSerializerOptions { Append = true };
257+
}
258+
else
259+
{
260+
opts = writeOptions;
182261
}
183262

184-
// Invoke: Task ParquetSerializer.SerializeAsync<TDto>(IEnumerable<TDto>, Stream, ...)
263+
// Invoke: Task ParquetSerializer.SerializeAsync<TDto>(IEnumerable<TDto>, Stream, options, ct)
185264
var task = (Task)
186-
_serializeMethod.Invoke(null, [dtosList, stream, null, CancellationToken.None])!;
265+
_serializeMethod.Invoke(null, [batch, stream, opts, CancellationToken.None])!;
187266
await task;
188267
}
189268

@@ -199,15 +278,20 @@ public async Task SerializeToParquetAsync(Stream stream, IAsyncEnumerable<TRow>
199278
}
200279

201280
/// <summary>
202-
/// Deserializes a single row group identified by <paramref name="rowGroupIndex"/>.
281+
/// Deserializes a single row group identified by <paramref name="rowGroupIndex"/>,
282+
/// threading any caller-supplied <paramref name="readOptions"/> into Parquet.NET.
203283
/// This keeps I/O bounded to one row group when consumers break early.
204284
/// </summary>
205-
public async Task<System.Collections.IList> DeserializeRowGroup(Stream stream, int rowGroupIndex)
285+
public async Task<System.Collections.IList> DeserializeRowGroup(
286+
Stream stream,
287+
int rowGroupIndex,
288+
ParquetSerializerOptions? readOptions
289+
)
206290
{
207291
// Invoke: Task<IList<TDto>> ParquetSerializer.DeserializeAsync<TDto>(Stream, int, options, ct)
208292
var task = (Task)_deserializeRowGroupMethod.Invoke(
209293
null,
210-
[stream, rowGroupIndex, null, CancellationToken.None]
294+
[stream, rowGroupIndex, readOptions, CancellationToken.None]
211295
)!;
212296
await task;
213297

src/extensions/Flowthru.Extensions.Parquet/Flowthru.Extensions.Parquet.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
</ItemGroup>
1919

2020
<ItemGroup>
21-
<PackageReference Include="Parquet.Net" Version="5.2.0" />
21+
<PackageReference Include="Parquet.Net" Version="5.6.0" />
2222
</ItemGroup>
2323

2424
<ItemGroup>

src/extensions/Flowthru.Extensions.Parquet/ParquetItemExtensions.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ public static class ParquetItemExtensions
1818
/// <param name="_">The enumerable catalog entries factory (from <see cref="ItemFactory.Enumerable"/>)</param>
1919
/// <param name="label">Unique catalog label for DAG resolution</param>
2020
/// <param name="filePath">Path or URI to Parquet file</param>
21+
/// <param name="options">
22+
/// Optional performance and behavior tuning. When <c>null</c>, production-ready defaults are
23+
/// used: Snappy compression, 1 000 000-row groups (≈100 MB), dictionary encoding enabled.
24+
/// </param>
2125
/// <param name="resolver">
2226
/// Optional resolver for remote URIs (e.g., <c>https://</c>, <c>sftp://</c>).
2327
/// Falls back to <see cref="Flowthru.Core.Data.Storage.Medium.FileStorageMedium"/> when <c>null</c>.
@@ -36,20 +40,22 @@ public static class ParquetItemExtensions
3640
/// <item>TRow must implement IBinarySerializable</item>
3741
/// </list>
3842
/// <para>
39-
/// <strong>Performance:</strong> Optimized for large datasets with columnar storage.
43+
/// <strong>Performance:</strong> Write path streams in bounded row-group batches —
44+
/// peak memory scales with row-group size, not total dataset size. Suitable for 1–10 GB datasets.
4045
/// </para>
4146
/// </remarks>
4247
public static Item<IEnumerable<TRow>> Parquet<TRow>(
4348
this EnumerableItemFactory _,
4449
string label,
4550
string filePath,
51+
ParquetItemOptions<TRow>? options = null,
4652
IStorageMediumResolver? resolver = null,
4753
IStorageMedium? medium = null
4854
)
4955
where TRow : notnull, IFlatSchema, IBinarySerializable
5056
{
5157
var resolvedMedium = medium ?? resolver?.Resolve(filePath) ?? new FileStorageMedium(filePath);
52-
var format = new ParquetFormatSerializer<TRow>();
58+
var format = new ParquetFormatSerializer<TRow>(options);
5359
var container = new EnumerableContainerAdapter<TRow>();
5460
var storage = new ComposedStorageAdapter<IEnumerable<TRow>, TRow>(
5561
resolvedMedium,

0 commit comments

Comments
 (0)