-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
file_reader.h
115 lines (85 loc) · 4.12 KB
/
file_reader.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.
#pragma once
#include <cstdint>
#include <memory>
#include "column/chunk.h"
#include "common/status.h"
#include "formats/parquet/group_reader.h"
#include "gen_cpp/parquet_types.h"
#include "runtime/runtime_state.h"
#include "util/buffered_stream.h"
#include "util/runtime_profile.h"
namespace starrocks {
class RandomAccessFile;
namespace vectorized {
class HdfsScannerContext;
} // namespace vectorized
} // namespace starrocks
namespace starrocks::parquet {
class FileMetaData;
class FileReader {
public:
FileReader(int chunk_size, RandomAccessFile* file, uint64_t file_size);
~FileReader();
Status init(vectorized::HdfsScannerContext* scanner_ctx);
Status get_next(vectorized::ChunkPtr* chunk);
private:
int _chunk_size;
// parse footer of parquet file
Status _parse_footer();
void _prepare_read_columns();
// init row group readers.
Status _init_group_readers();
// filter row group by min/max conjuncts
StatusOr<bool> _filter_group(const tparquet::RowGroup& row_group);
// get row group to read
// if scan range conatain the first byte in the row group, will be read
// TODO: later modify the larger block should be read
bool _select_row_group(const tparquet::RowGroup& row_group);
// make min/max chunk from stats of row group meta
// exist=true: group meta contain statistics info
Status _read_min_max_chunk(const tparquet::RowGroup& row_group, vectorized::ChunkPtr* min_chunk,
vectorized::ChunkPtr* max_chunk, bool* exist) const;
Status _get_next_internal(vectorized::ChunkPtr* chunk);
// only scan partition column + not exist column
Status _exec_only_partition_scan(vectorized::ChunkPtr* chunk);
// get partition column idx in param.partition_columns
int _get_partition_column_idx(const std::string& col_name) const;
// check magic number of parquet file
// current olny support "PAR1"
static Status _check_magic(const uint8_t* file_magic);
// decode min/max value from row group stats
static Status _decode_min_max_column(const ParquetField& field, const std::string& timezone,
const TypeDescriptor& type, const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order, vectorized::ColumnPtr* min_column,
vectorized::ColumnPtr* max_column, bool* decode_ok);
static bool _can_use_min_max_stats(const tparquet::ColumnMetaData& column_meta,
const tparquet::ColumnOrder* column_order);
// statistics.min_value max_value
static bool _can_use_stats(const tparquet::Type::type& type, const tparquet::ColumnOrder* column_order);
// statistics.min max
static bool _can_use_deprecated_stats(const tparquet::Type::type& type, const tparquet::ColumnOrder* column_order);
static bool _is_integer_type(const tparquet::Type::type& type);
// find column meta according column name
static const tparquet::ColumnMetaData* _get_column_meta(const tparquet::RowGroup& row_group,
const std::string& col_name);
// get the data page start offset in parquet file
static int64_t _get_row_group_start_offset(const tparquet::RowGroup& row_group);
RandomAccessFile* _file;
uint64_t _file_size;
std::shared_ptr<FileMetaData> _file_metadata;
vector<std::shared_ptr<GroupReader>> _row_group_readers;
size_t _cur_row_group_idx = 0;
size_t _row_group_size = 0;
vectorized::Schema _schema;
std::vector<GroupReaderParam::Column> _read_cols;
size_t _total_row_count = 0;
size_t _scan_row_count = 0;
bool _is_only_partition_scan = false;
// not exist column conjuncts eval false, file can be skipped
bool _is_file_filtered = false;
vectorized::HdfsScannerContext* _scanner_ctx;
std::shared_ptr<SharedBufferedInputStream> _sb_stream = nullptr;
GroupReaderParam _group_reader_param;
};
} // namespace starrocks::parquet