Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge tree data parts #3346

Merged
merged 5 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,12 @@ pub enum Error {
error: parquet::errors::ParquetError,
location: Location,
},

#[snafu(display("Failed to iter data part"))]
ReadDataPart {
#[snafu(source)]
error: parquet::errors::ParquetError,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -669,7 +675,7 @@ impl ErrorExt for Error {
FilterRecordBatch { source, .. } => source.status_code(),
Upload { .. } => StatusCode::StorageUnavailable,
BiError { .. } => StatusCode::Internal,
EncodeMemtable { .. } => StatusCode::Internal,
EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal,
}
}

Expand Down
241 changes: 214 additions & 27 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Data part of a shard.

use std::cmp::{Ordering, Reverse};
use std::fmt::{Debug, Formatter};
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -31,6 +32,7 @@ use datatypes::vectors::{
TimestampSecondVector, UInt16Vector, UInt16VectorBuilder, UInt64Vector, UInt64VectorBuilder,
UInt8VectorBuilder,
};
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use snafu::ResultExt;
Expand Down Expand Up @@ -140,13 +142,13 @@ impl DataBuffer {
/// `freeze` clears the buffers of builders.
pub fn freeze(&mut self, pk_weights: &[u16]) -> Result<DataPart> {
let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None);
let encoded = encoder.write(self)?;
Ok(DataPart::Parquet(encoded))
let parts = encoder.write(self)?;
Ok(parts)
}

/// Reads batches from data buffer without resetting builder's buffers.
pub fn iter(&mut self, pk_weights: &[u16]) -> Result<DataBufferIter> {
// todo(hl): control whether to dedup while invoking `iter`.
pub fn read(&mut self, pk_weights: &[u16]) -> Result<DataBufferReader> {
// todo(hl): control whether to dedup while invoking `read`.
let batch = data_buffer_to_record_batches(
self.data_part_schema.clone(),
self,
Expand All @@ -155,7 +157,7 @@ impl DataBuffer {
true,
true,
)?;
DataBufferIter::new(batch)
DataBufferReader::new(batch)
}

/// Returns num of rows in data buffer.
Expand Down Expand Up @@ -287,29 +289,29 @@ fn data_buffer_to_record_batches(
}

#[derive(Debug)]
pub(crate) struct DataBufferIter {
pub(crate) struct DataBufferReader {
batch: RecordBatch,
offset: usize,
current_batch: Option<(PkIndex, Range<usize>)>,
}

impl DataBufferIter {
impl DataBufferReader {
pub(crate) fn new(batch: RecordBatch) -> Result<Self> {
let mut iter = Self {
let mut reader = Self {
batch,
offset: 0,
current_batch: None,
};
iter.next()?; // fill data batch for comparison and merge.
Ok(iter)
reader.next()?; // fill data batch for comparison and merge.
Ok(reader)
}

pub(crate) fn is_valid(&self) -> bool {
self.current_batch.is_some()
}

/// # Panics
/// If Current iterator is not exhausted.
/// If Current reader is exhausted.
pub(crate) fn current_data_batch(&self) -> DataBatch {
let (pk_index, range) = self.current_batch.as_ref().unwrap();
DataBatch {
Expand All @@ -320,13 +322,13 @@ impl DataBufferIter {
}

/// # Panics
/// If Current iterator is exhausted.
/// If Current reader is exhausted.
pub(crate) fn current_pk_index(&self) -> PkIndex {
let (pk_index, _) = self.current_batch.as_ref().unwrap();
*pk_index
}

/// Advances iterator to next data batch.
/// Advances reader to next data batch.
pub(crate) fn next(&mut self) -> Result<()> {
if self.offset >= self.batch.num_rows() {
self.current_batch = None;
Expand Down Expand Up @@ -506,7 +508,7 @@ impl<'a> DataPartEncoder<'a> {
.build()
})
}
pub fn write(&self, source: &mut DataBuffer) -> Result<Bytes> {
pub fn write(&self, source: &mut DataBuffer) -> Result<DataPart> {
let mut bytes = Vec::with_capacity(1024);
let mut writer = ArrowWriter::try_new(&mut bytes, self.schema.clone(), self.writer_props())
.context(error::EncodeMemtableSnafu)?;
Expand All @@ -519,26 +521,138 @@ impl<'a> DataPartEncoder<'a> {
true,
)?;
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
let _file_meta = writer.close().context(error::EncodeMemtableSnafu)?;
Ok(Bytes::from(bytes))
let _metadata = writer.close().context(error::EncodeMemtableSnafu)?;
Ok(DataPart::Parquet(ParquetPart {
data: Bytes::from(bytes),
}))
}
}

/// Data parts under a shard.
pub struct DataParts {
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
/// The active writing buffer.
pub(crate) active: DataBuffer,
/// immutable (encoded) parts.
pub(crate) frozen: Vec<DataPart>,
}

/// Format of immutable data part.
pub enum DataPart {
Parquet(Bytes),
Parquet(ParquetPart),
}

impl DataPart {
fn is_empty(&self) -> bool {
match self {
DataPart::Parquet(data) => data.is_empty(),
DataPart::Parquet(p) => p.data.is_empty(),
}
}

/// Reads frozen data part and yields [DataBatch]es.
pub fn read(&self) -> Result<DataPartReader> {
match self {
DataPart::Parquet(data_bytes) => DataPartReader::new(data_bytes.data.clone(), None),
}
}
}

/// Data parts under a shard.
pub struct DataParts {}
pub struct DataPartReader {
inner: ParquetRecordBatchReader,
current_range: Range<usize>,
current_pk_index: Option<PkIndex>,
current_batch: Option<RecordBatch>,
}

impl Debug for DataPartReader {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DataPartReader")
.field("current_range", &self.current_range)
.field("current_pk_index", &self.current_pk_index)
.finish()
}
}

impl DataPartReader {
pub fn new(data: Bytes, batch_size: Option<usize>) -> Result<Self> {
let mut builder =
ParquetRecordBatchReaderBuilder::try_new(data).context(error::ReadDataPartSnafu)?;
if let Some(batch_size) = batch_size {
builder = builder.with_batch_size(batch_size);
}
let parquet_reader = builder.build().context(error::ReadDataPartSnafu)?;
let mut reader = Self {
inner: parquet_reader,
current_pk_index: None,
current_range: 0..0,
current_batch: None,
};
reader.next()?;
Ok(reader)
}

/// Returns false if current reader is exhausted.
pub(crate) fn is_valid(&self) -> bool {
self.current_pk_index.is_some()
}

/// Returns current pk index.
///
/// # Panics
/// If reader is exhausted.
pub(crate) fn current_pk_index(&self) -> PkIndex {
self.current_pk_index.expect("DataPartReader is exhausted")
}

/// Returns current data batch of reader.
/// # Panics
/// If reader is exhausted.
pub(crate) fn current_data_batch(&self) -> DataBatch {
let rb = self.current_batch.as_ref().unwrap();
let pk_index = self.current_pk_index.unwrap();
let range = self.current_range.clone();
DataBatch {
pk_index,
rb,
range,
}
}

pub(crate) fn next(&mut self) -> Result<()> {
if let Some((next_pk, range)) = self.search_next_pk_range() {
// first try to search next pk in current record batch.
self.current_pk_index = Some(next_pk);
self.current_range = range;
} else {
// current record batch reaches eof, fetch next record batch from parquet reader.
if let Some(rb) = self.inner.next() {
let rb = rb.context(error::ComputeArrowSnafu)?;
self.current_range = 0..0;
self.current_batch = Some(rb);
return self.next();
} else {
// parquet is also exhausted
self.current_pk_index = None;
self.current_batch = None;
}
}

Ok(())
}

/// Searches next primary key along with it's offset range inside record batch.
fn search_next_pk_range(&self) -> Option<(PkIndex, Range<usize>)> {
self.current_batch.as_ref().and_then(|b| {
// safety: PK_INDEX_COLUMN_NAME must present in record batch yielded by data part.
let pk_array = pk_index_array(b);
search_next_pk_range(pk_array, self.current_range.end)
})
}
}

/// Parquet-encoded `DataPart`.
pub struct ParquetPart {
data: Bytes,
}

#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -778,7 +892,10 @@ mod tests {
assert_eq!(4, buffer.num_rows());

let encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None);
let encoded = encoder.write(&mut buffer).unwrap();
let encoded = match encoder.write(&mut buffer).unwrap() {
DataPart::Parquet(data) => data.data,
};

let s = String::from_utf8_lossy(encoded.as_bytes());
assert!(s.starts_with("PAR1"));
assert!(s.ends_with("PAR1"));
Expand All @@ -789,10 +906,10 @@ mod tests {
assert_eq!(3, batch.num_rows());
}

fn check_buffer_values_equal(iter: &mut DataBufferIter, expected_values: &[Vec<f64>]) {
fn check_buffer_values_equal(reader: &mut DataBufferReader, expected_values: &[Vec<f64>]) {
let mut output = Vec::with_capacity(expected_values.len());
while iter.is_valid() {
let batch = iter.current_data_batch().slice_record_batch();
while reader.is_valid() {
let batch = reader.current_data_batch().slice_record_batch();
let values = batch
.column_by_name("v1")
.unwrap()
Expand All @@ -803,7 +920,7 @@ mod tests {
.map(|v| v.unwrap())
.collect::<Vec<_>>();
output.push(values);
iter.next().unwrap();
reader.next().unwrap();
}
assert_eq!(expected_values, output);
}
Expand Down Expand Up @@ -842,15 +959,85 @@ mod tests {
2,
);

let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap();
let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap();
check_buffer_values_equal(&mut iter, &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]]);
}

#[test]
fn test_iter_empty_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap();
let mut iter = buffer.read(&[0, 1, 3, 2]).unwrap();
check_buffer_values_equal(&mut iter, &[]);
}

fn check_part_values_equal(iter: &mut DataPartReader, expected_values: &[Vec<f64>]) {
let mut output = Vec::with_capacity(expected_values.len());
while iter.is_valid() {
let batch = iter.current_data_batch().slice_record_batch();
let values = batch
.column_by_name("v1")
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.iter()
.map(|v| v.unwrap())
.collect::<Vec<_>>();
output.push(values);
iter.next().unwrap();
}
assert_eq!(expected_values, output);
}

fn check_iter_data_part(weights: &[u16], expected_values: &[Vec<f64>]) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);

write_rows_to_buffer(
&mut buffer,
&meta,
2,
vec![0, 1, 2],
vec![Some(1.0), Some(2.0), Some(3.0)],
2,
);

write_rows_to_buffer(
&mut buffer,
&meta,
3,
vec![1, 2, 3],
vec![Some(1.1), Some(2.1), Some(3.1)],
3,
);

write_rows_to_buffer(
&mut buffer,
&meta,
2,
vec![2, 3],
vec![Some(2.2), Some(2.3)],
4,
);

let encoder = DataPartEncoder::new(&meta, weights, Some(4));
let encoded = encoder.write(&mut buffer).unwrap();

let mut iter = encoded.read().unwrap();
check_part_values_equal(&mut iter, expected_values);
}

#[test]
fn test_iter_data_part() {
check_iter_data_part(
&[0, 1, 2, 3],
&[vec![1.0, 2.0, 3.0, 2.3], vec![1.1, 2.1, 3.1]],
);

check_iter_data_part(
&[3, 2, 1, 0],
&[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0, 2.3]],
);
}
}