/
ParquetRowGroupWriter.cs
105 lines (91 loc) · 3.85 KB
/
ParquetRowGroupWriter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Parquet.Data;
using Parquet.File;
using Parquet.File.Values;
namespace Parquet
{
/// <summary>
/// Writer for Parquet row groups
/// </summary>
#pragma warning disable CA1063 // Implement IDisposable Correctly
public class ParquetRowGroupWriter : IDisposable
#pragma warning restore CA1063 // Implement IDisposable Correctly
{
private readonly Schema _schema;
private readonly Stream _stream;
private readonly ThriftStream _thriftStream;
private readonly ThriftFooter _footer;
private readonly CompressionMethod _compressionMethod;
private readonly int _compressionLevel;
private readonly ParquetOptions _formatOptions;
private readonly Thrift.RowGroup _thriftRowGroup;
private readonly long _rgStartPos;
private readonly Thrift.SchemaElement[] _thschema;
private int _colIdx;
internal ParquetRowGroupWriter(Schema schema,
Stream stream,
ThriftStream thriftStream,
ThriftFooter footer,
CompressionMethod compressionMethod,
int compressionLevel,
ParquetOptions formatOptions)
{
_schema = schema ?? throw new ArgumentNullException(nameof(schema));
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_thriftStream = thriftStream ?? throw new ArgumentNullException(nameof(thriftStream));
_footer = footer ?? throw new ArgumentNullException(nameof(footer));
_compressionMethod = compressionMethod;
_compressionLevel = compressionLevel;
_formatOptions = formatOptions;
_thriftRowGroup = _footer.AddRowGroup();
_rgStartPos = _stream.Position;
_thriftRowGroup.Columns = new List<Thrift.ColumnChunk>();
_thschema = _footer.GetWriteableSchema();
}
internal long? RowCount { get; private set; }
/// <summary>
/// Writes next data column to parquet stream. Note that columns must be written in the order they are declared in the
/// file schema.
/// </summary>
/// <param name="column"></param>
public void WriteColumn(DataColumn column)
{
if (column == null) throw new ArgumentNullException(nameof(column));
if (RowCount == null)
{
if (column.Data.Length > 0 || column.Field.MaxRepetitionLevel == 0)
RowCount = column.CalculateRowCount();
}
Thrift.SchemaElement tse = _thschema[_colIdx];
if(!column.Field.Equals(tse))
{
throw new ArgumentException($"cannot write this column, expected '{tse.Name}', passed: '{column.Field.Name}'", nameof(column));
}
IDataTypeHandler dataTypeHandler = DataTypeFactory.Match(tse, _formatOptions);
_colIdx += 1;
List<string> path = _footer.GetPath(tse);
var writer = new DataColumnWriter(_stream, _thriftStream, _footer, tse,
_compressionMethod, _compressionLevel,
(int)(RowCount ?? 0));
Thrift.ColumnChunk chunk = writer.Write(path, column, dataTypeHandler);
_thriftRowGroup.Columns.Add(chunk);
}
/// <summary>
///
/// </summary>
#pragma warning disable CA1063 // Implement IDisposable Correctly
public void Dispose()
#pragma warning restore CA1063 // Implement IDisposable Correctly
{
//todo: check if all columns are present
//row count is know only after at least one column is written
_thriftRowGroup.Num_rows = RowCount ?? 0;
//row group's size is a sum of _uncompressed_ sizes of all columns in it, including the headers
//luckily ColumnChunk already contains sizes of page+header in it's meta
_thriftRowGroup.Total_byte_size = _thriftRowGroup.Columns.Sum(c => c.Meta_data.Total_compressed_size);
}
}
}