Skip to content

Commit

Permalink
fix bug: write column metadata to the behind of the column chunk data
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 committed Jun 25, 2022
1 parent 9f7b600 commit b104d64
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
23 changes: 23 additions & 0 deletions parquet/src/file/metadata.rs
Expand Up @@ -611,6 +611,29 @@ impl ColumnChunkMetaData {
encrypted_column_metadata: None,
}
}

/// Method to convert to Thrift `ColumnMetaData`
pub fn to_column_metadata_thrift(&self) -> ColumnMetaData {
ColumnMetaData {
type_: self.column_type.into(),
encodings: self.encodings().iter().map(|&v| v.into()).collect(),
path_in_schema: Vec::from(self.column_path.as_ref()),
codec: self.compression.into(),
num_values: self.num_values,
total_uncompressed_size: self.total_uncompressed_size,
total_compressed_size: self.total_compressed_size,
key_value_metadata: None,
data_page_offset: self.data_page_offset,
index_page_offset: self.index_page_offset,
dictionary_page_offset: self.dictionary_page_offset,
statistics: statistics::to_thrift(self.statistics.as_ref()),
encoding_stats: self
.encoding_stats
.as_ref()
.map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()),
bloom_filter_offset: self.bloom_filter_offset,
}
}
}

/// Builder for column chunk metadata.
Expand Down
11 changes: 7 additions & 4 deletions parquet/src/file/writer.rs
Expand Up @@ -435,12 +435,15 @@ impl<'a, W: Write> SerializedPageWriter<'a, W> {
Ok(self.sink.bytes_written() - start_pos)
}

/// Serializes column chunk into Thrift.
/// Serializes column metadata into Thrift.
/// Returns Ok() if there are not errors serializing and writing data into the sink.
#[inline]
fn serialize_column_chunk(&mut self, chunk: parquet::ColumnChunk) -> Result<()> {
fn serialize_column_chunk(
&mut self,
column_metadata: parquet::ColumnMetaData,
) -> Result<()> {
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
chunk.write_to_out_protocol(&mut protocol)?;
column_metadata.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
Ok(())
}
Expand Down Expand Up @@ -533,7 +536,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {
}

fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
self.serialize_column_chunk(metadata.to_thrift())
self.serialize_column_chunk(metadata.to_column_metadata_thrift())
}

fn close(&mut self) -> Result<()> {
Expand Down

0 comments on commit b104d64

Please sign in to comment.