Skip to content

Commit

Permalink
Add ParquetMetadata::memory_size size estimation (#5965)
Browse files Browse the repository at this point in the history
* Add ParquetMetadata::memory_size size estimation

* Require HeapSize for ParquetValueType
  • Loading branch information
alamb committed Jul 2, 2024
1 parent ebc1cb1 commit e61fb62
Show file tree
Hide file tree
Showing 4 changed files with 377 additions and 3 deletions.
26 changes: 24 additions & 2 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,9 +586,9 @@ pub(crate) mod private {
use crate::encodings::decoding::PlainDecoderDetails;
use crate::util::bit_util::{read_num_bytes, BitReader, BitWriter};

use crate::basic::Type;

use super::{ParquetError, Result, SliceAsBytes};
use crate::basic::Type;
use crate::file::metadata::HeapSize;

/// Sealed trait to start to remove specialisation from implementations
///
Expand All @@ -606,6 +606,7 @@ pub(crate) mod private {
+ SliceAsBytes
+ PartialOrd
+ Send
+ HeapSize
+ crate::encodings::decoding::private::GetDecoder
+ crate::file::statistics::private::MakeStatistics
{
Expand Down Expand Up @@ -886,6 +887,12 @@ pub(crate) mod private {
}
}

impl HeapSize for super::Int96 {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl ParquetValueType for super::ByteArray {
const PHYSICAL_TYPE: Type = Type::BYTE_ARRAY;

Expand Down Expand Up @@ -970,6 +977,15 @@ pub(crate) mod private {
}
}

impl HeapSize for super::ByteArray {
fn heap_size(&self) -> usize {
// note: this is an estimate, not exact, so just return the size
// of the actual data used, don't try to handle the fact that it may
// be shared.
self.data.as_ref().map(|data| data.len()).unwrap_or(0)
}
}

impl ParquetValueType for super::FixedLenByteArray {
const PHYSICAL_TYPE: Type = Type::FIXED_LEN_BYTE_ARRAY;

Expand Down Expand Up @@ -1055,6 +1071,12 @@ pub(crate) mod private {
self
}
}

impl HeapSize for super::FixedLenByteArray {
fn heap_size(&self) -> usize {
self.0.heap_size()
}
}
}

/// Contains the Parquet physical type information as well as the Rust primitive type
Expand Down
228 changes: 228 additions & 0 deletions parquet/src/file/metadata/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Memory calculations for [`ParquetMetadata::memory_size`]
//!
//! [`ParquetMetadata::memory_size`]: crate::file::metadata::ParquetMetaData::memory_size
use crate::basic::{ColumnOrder, Compression, Encoding, PageType};
use crate::data_type::private::ParquetValueType;
use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, KeyValue, RowGroupMetaData};
use crate::file::page_encoding_stats::PageEncodingStats;
use crate::file::page_index::index::{Index, NativeIndex, PageIndex};
use crate::file::statistics::{Statistics, ValueStatistics};
use crate::format::{BoundaryOrder, PageLocation, SortingColumn};
use std::sync::Arc;

/// Trait for calculating the size of various containers
pub trait HeapSize {
/// Return the size of any bytes allocated on the heap by this object,
/// including heap memory in those structures
///
/// Note that the size of the type itself is not included in the result --
/// instead, that size is added by the caller (e.g. container).
fn heap_size(&self) -> usize;
}

impl<T: HeapSize> HeapSize for Vec<T> {
fn heap_size(&self) -> usize {
let item_size = std::mem::size_of::<T>();
// account for the contents of the Vec
(self.capacity() * item_size) +
// add any heap allocations by contents
self.iter().map(|t| t.heap_size()).sum::<usize>()
}
}

impl<T: HeapSize> HeapSize for Arc<T> {
fn heap_size(&self) -> usize {
self.as_ref().heap_size()
}
}

impl<T: HeapSize> HeapSize for Option<T> {
fn heap_size(&self) -> usize {
self.as_ref().map(|inner| inner.heap_size()).unwrap_or(0)
}
}

impl HeapSize for String {
fn heap_size(&self) -> usize {
self.capacity()
}
}

impl HeapSize for FileMetaData {
fn heap_size(&self) -> usize {
self.created_by.heap_size()
+ self.key_value_metadata.heap_size()
+ self.schema_descr.heap_size()
+ self.column_orders.heap_size()
}
}

impl HeapSize for KeyValue {
fn heap_size(&self) -> usize {
self.key.heap_size() + self.value.heap_size()
}
}

impl HeapSize for RowGroupMetaData {
fn heap_size(&self) -> usize {
// don't count schema_descr here because it is already
// counted in FileMetaData
self.columns.heap_size() + self.sorting_columns.heap_size()
}
}

impl HeapSize for ColumnChunkMetaData {
fn heap_size(&self) -> usize {
// don't count column_descr here because it is already counted in
// FileMetaData
self.encodings.heap_size()
+ self.file_path.heap_size()
+ self.compression.heap_size()
+ self.statistics.heap_size()
+ self.encoding_stats.heap_size()
}
}

impl HeapSize for Encoding {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl HeapSize for PageEncodingStats {
fn heap_size(&self) -> usize {
self.page_type.heap_size() + self.encoding.heap_size()
}
}

impl HeapSize for SortingColumn {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}
impl HeapSize for Compression {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl HeapSize for PageType {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}
impl HeapSize for Statistics {
fn heap_size(&self) -> usize {
match self {
Statistics::Boolean(value_statistics) => value_statistics.heap_size(),
Statistics::Int32(value_statistics) => value_statistics.heap_size(),
Statistics::Int64(value_statistics) => value_statistics.heap_size(),
Statistics::Int96(value_statistics) => value_statistics.heap_size(),
Statistics::Float(value_statistics) => value_statistics.heap_size(),
Statistics::Double(value_statistics) => value_statistics.heap_size(),
Statistics::ByteArray(value_statistics) => value_statistics.heap_size(),
Statistics::FixedLenByteArray(value_statistics) => value_statistics.heap_size(),
}
}
}

impl HeapSize for Index {
fn heap_size(&self) -> usize {
match self {
Index::NONE => 0,
Index::BOOLEAN(native_index) => native_index.heap_size(),
Index::INT32(native_index) => native_index.heap_size(),
Index::INT64(native_index) => native_index.heap_size(),
Index::INT96(native_index) => native_index.heap_size(),
Index::FLOAT(native_index) => native_index.heap_size(),
Index::DOUBLE(native_index) => native_index.heap_size(),
Index::BYTE_ARRAY(native_index) => native_index.heap_size(),
Index::FIXED_LEN_BYTE_ARRAY(native_index) => native_index.heap_size(),
}
}
}

impl<T: ParquetValueType> HeapSize for NativeIndex<T> {
fn heap_size(&self) -> usize {
self.indexes.heap_size() + self.boundary_order.heap_size()
}
}

impl<T: ParquetValueType> HeapSize for PageIndex<T> {
fn heap_size(&self) -> usize {
self.min.heap_size() + self.max.heap_size() + self.null_count.heap_size()
}
}

impl<T: ParquetValueType> HeapSize for ValueStatistics<T> {
fn heap_size(&self) -> usize {
self.min().heap_size() + self.max().heap_size()
}
}
impl HeapSize for bool {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}
impl HeapSize for i32 {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}
impl HeapSize for i64 {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl HeapSize for f32 {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}
impl HeapSize for f64 {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl HeapSize for usize {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl HeapSize for BoundaryOrder {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl HeapSize for PageLocation {
fn heap_size(&self) -> usize {
0 // no heap allocations
}
}

impl HeapSize for ColumnOrder {
fn heap_size(&self) -> usize {
0 // no heap allocations in ColumnOrder
}
}
Loading

0 comments on commit e61fb62

Please sign in to comment.