Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ parquet-variant-compute = { workspace = true, optional = true }
object_store = { workspace = true, optional = true, features = ["tokio"] }

bytes = { version = "1.1", default-features = false, features = ["std"] }
thrift = { version = "0.17", default-features = false }
snap = { version = "1.0", default-features = false, optional = true }
brotli = { version = "8.0", default-features = false, features = ["std"], optional = true }
# To use `flate2` you must enable either the `flate2-zlib-rs` or `flate2-rust_backened` backends
Expand Down
143 changes: 19 additions & 124 deletions parquet/src/bin/parquet-layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

//! Binary that prints the physical layout of a parquet file
//!
//! NOTE: due to this binary's use of the deprecated [`parquet::format`] module, it
//! will no longer be maintained, and will likely be removed in the future.
//! Alternatives to this include [`parquet-cli`] and [`parquet-viewer`].
//! Alternatives to this binary include [`parquet-cli`] and [`parquet-viewer`].
//!
//! # Install
//!
Expand All @@ -41,19 +39,14 @@
//! [`parquet-viewer`]: https://github.com/xiangpenghao/parquet-viewer

use std::fs::File;
use std::io::Read;

use clap::Parser;
use parquet::file::metadata::ParquetMetaDataReader;
use serde::Serialize;
use thrift::protocol::TCompactInputProtocol;
use serde::{Serialize, Serializer};

use parquet::basic::CompressionCodec;
use parquet::basic::{CompressionCodec, Encoding};
use parquet::errors::Result;
use parquet::file::reader::ChunkReader;
#[allow(deprecated)]
use parquet::format::PageHeader;
use parquet::thrift::TSerializable;

#[derive(Serialize, Debug)]
struct Index {
Expand Down Expand Up @@ -87,22 +80,22 @@ struct ColumnChunk {
offset_index: Option<Index>,
column_index: Option<Index>,
bloom_filter: Option<Index>,
pages: Vec<Page>,
compression: DebugSerialize<CompressionCodec>,
encodings: Vec<DebugSerialize<Encoding>>,
}

#[derive(Serialize, Debug)]
struct Page {
compression: Option<&'static str>,
encoding: &'static str,
page_type: &'static str,
offset: u64,
compressed_bytes: i32,
uncompressed_bytes: i32,
header_bytes: i32,
num_values: i32,
#[derive(Debug)]
struct DebugSerialize<T: std::fmt::Debug>(T);

impl<T: std::fmt::Debug> Serialize for DebugSerialize<T> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("{:?}", &self.0))
}
}

#[allow(deprecated)]
fn do_layout<C: ChunkReader>(reader: &C) -> Result<ParquetFile> {
let mut metadata_reader = ParquetMetaDataReader::new();
metadata_reader.try_parse(reader)?;
Expand All @@ -118,55 +111,8 @@ fn do_layout<C: ChunkReader>(reader: &C) -> Result<ParquetFile> {
.iter()
.zip(schema.columns())
.map(|(column, column_schema)| {
let compression = compression(column.compression_codec());
let mut pages = vec![];

let mut start = column
.dictionary_page_offset()
.unwrap_or_else(|| column.data_page_offset())
as u64;

let end = start + column.compressed_size() as u64;
while start != end {
let (header_len, header) = read_page_header(reader, start)?;
if let Some(dictionary) = header.dictionary_page_header {
pages.push(Page {
compression,
encoding: encoding(dictionary.encoding.0),
page_type: "dictionary",
offset: start,
compressed_bytes: header.compressed_page_size,
uncompressed_bytes: header.uncompressed_page_size,
header_bytes: header_len as _,
num_values: dictionary.num_values,
})
} else if let Some(data_page) = header.data_page_header {
pages.push(Page {
compression,
encoding: encoding(data_page.encoding.0),
page_type: "data_page_v1",
offset: start,
compressed_bytes: header.compressed_page_size,
uncompressed_bytes: header.uncompressed_page_size,
header_bytes: header_len as _,
num_values: data_page.num_values,
})
} else if let Some(data_page) = header.data_page_header_v2 {
let is_compressed = data_page.is_compressed.unwrap_or(true);

pages.push(Page {
compression: compression.filter(|_| is_compressed),
encoding: encoding(data_page.encoding.0),
page_type: "data_page_v2",
offset: start,
compressed_bytes: header.compressed_page_size,
uncompressed_bytes: header.uncompressed_page_size,
header_bytes: header_len as _,
num_values: data_page.num_values,
})
}
start += header.compressed_page_size as u64 + header_len as u64;
}
let compression = DebugSerialize(column.compression_codec());
let encodings = column.encodings().map(DebugSerialize).collect();

Ok(ColumnChunk {
path: column_schema.path().parts().join("."),
Expand All @@ -185,7 +131,8 @@ fn do_layout<C: ChunkReader>(reader: &C) -> Result<ParquetFile> {
offset,
length: column.bloom_filter_length(),
}),
pages,
compression,
encodings,
})
})
.collect::<Result<Vec<_>>>()?;
Expand All @@ -203,58 +150,6 @@ fn do_layout<C: ChunkReader>(reader: &C) -> Result<ParquetFile> {
})
}

/// Reads the page header at `offset` from `reader`, returning
/// both the `PageHeader` and its length in bytes
#[allow(deprecated)]
fn read_page_header<C: ChunkReader>(reader: &C, offset: u64) -> Result<(usize, PageHeader)> {
struct TrackedRead<R>(R, usize);

impl<R: Read> Read for TrackedRead<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let v = self.0.read(buf)?;
self.1 += v;
Ok(v)
}
}

let input = reader.get_read(offset)?;
let mut tracked = TrackedRead(input, 0);
let mut prot = TCompactInputProtocol::new(&mut tracked);
let header = PageHeader::read_from_in_protocol(&mut prot)?;
Ok((tracked.1, header))
}

/// Returns a string representation for a given compression
fn compression(compression: CompressionCodec) -> Option<&'static str> {
match compression {
CompressionCodec::UNCOMPRESSED => None,
CompressionCodec::SNAPPY => Some("snappy"),
CompressionCodec::GZIP => Some("gzip"),
CompressionCodec::LZO => Some("lzo"),
CompressionCodec::BROTLI => Some("brotli"),
CompressionCodec::LZ4 => Some("lz4"),
CompressionCodec::ZSTD => Some("zstd"),
CompressionCodec::LZ4_RAW => Some("lz4_raw"),
}
}

/// Returns a string representation for a given encoding
fn encoding(encoding: i32) -> &'static str {
match encoding {
0 => "plain",
2 => "plain_dictionary",
3 => "rle",
#[allow(deprecated)]
4 => "bit_packed",
5 => "delta_binary_packed",
6 => "delta_length_byte_array",
7 => "delta_byte_array",
8 => "rle_dictionary",
9 => "byte_stream_split",
_ => "unknown",
}
}

#[derive(Debug, Parser)]
#[clap(author, version, about("Prints the physical layout of a parquet file"), long_about = None)]
struct Args {
Expand Down
6 changes: 0 additions & 6 deletions parquet/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,6 @@ impl From<snap::Error> for ParquetError {
}
}

impl From<thrift::Error> for ParquetError {
fn from(e: thrift::Error) -> ParquetError {
ParquetError::External(Box::new(e))
}
}

impl From<cell::BorrowMutError> for ParquetError {
fn from(e: cell::BorrowMutError) -> ParquetError {
ParquetError::External(Box::new(e))
Expand Down
Loading
Loading